1) But if you share ExecutionContext with the actor (both using 
dispatcher's thread) then there cannot be a concurrent execution - it's 
just a question of whether "Future#onComplete" callback executes before 
receive partial function returns

2) I guess you are right, that even when using dispatcher's thread the 
"Future#onComplete" is probably executed after actor's 'receive' function 
returns, so even though it is done so by the same thread, it can happen 
after another Request is processed in the mean time because it was queued 
prior to "Future#onComplete" callback...

I think that the solution is to substitute "Future#onComplete" with 
"Future#map" and handling only Future error in "Future#recover" ... I'll 
try that out... Thank you

On Monday, May 25, 2015 at 11:26:09 AM UTC+2, √ wrote:
>
> Hi Jakub,
>
> Starting to read your email I definitely thought there must be something 
> mysterious at work!
>
> From what I can tell, there are a couple of compounding things here:
>
> 1) future.onComplete will be executed on another thread than the actor, or 
> "concurrently with the actor", this means that you can't close over the 
> actor and call methods on it from another thread, see: 
> http://doc.akka.io/docs/akka/2.3.11/additional/faq.html
>
> 2) when you call `Await` on the Future, you're only going to await it 
> having a value, not await its callbacks to finish execute.
> So:
>
> 1. val f = someFuture(…)
> 2. f.onComplete { … }
> 3. Await.result(f, …)
>
> When line 3 executes, onComplete could have already executed, is 
> (con)currently being executed or will be executed.
>
> Does that make sense?
>
>
> On Mon, May 25, 2015 at 11:03 AM, Jakub Liska <[email protected] 
> <javascript:>> wrote:
>
>> Hi,
>>
>> in other words :
>>
>> def receive: Receive = {
>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>
>>     // can it happen that another Request message comes before this partial 
>> function returns (while this one is being processed) ?
>>
>> }
>>
>>
>> I have an asynchronous ActorProvider that is scanning ElasticSearch 
>> index, but I'm calling "await" at the end, so it is basically blocking :
>>
>> private var lastScrollId: String = _
>>
>>
>> def receive: Receive = {
>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>     def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = 
>> {
>>       require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
>> present!")
>>       scroll(scrollId) flatMap {
>>         case (sid, recs) if recs.isEmpty => // empty hits means end of 
>> scanning/scrolling
>>           Future.successful(Option.empty)
>>         case (sid, recs) =>
>>           onNext(recs)
>>           if (n > 1)
>>             pushRecursively(n-1, sid)
>>           else
>>             Future.successful(Option(sid))
>>       }
>>     }
>>
>>     val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
>>     f onComplete {
>>       case Failure(ex) =>
>>         log.error(ex, "Unexpected ScanSource error")
>>         onError(ex)
>>         context.stop(self)
>>       case Success(sidOpt) => sidOpt match {
>>         case None =>
>>           log.info("ScanSource just completed...")
>>           if (isCompleted)
>>             log.warning("ScanSource already completed, I cannot figure out 
>> why this occurs!")
>>           else {
>>             onComplete()
>>             context.stop(self)
>>           }
>>         case Some(sid) =>
>>           lastScrollId = sid
>>       }
>>     }
>>     f.await(600.seconds)
>>
>>   case Cancel =>
>>     context.stop(self)
>> }
>>
>>
>> But as you can see, there is "log.warning" sayig that onComplete() was 
>> already called, which can happen only if ActorPublisher wasn't Requested 
>> sequentially.
>>
>> I think this implementation is correct and valid even though it is 
>> blocking actor's dispatcher thread. But I really cannot figure out how it 
>> can be "completed" twice...
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>  

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to