在 2014年4月29日星期二UTC+8上午4时50分46秒,rkuhn写道:
>
> Yang Bo,
>
> thanks for experimenting with and sharing this: when macros started to 
> materialize two years ago I had the same thought and proposed (internally) 
> to investigate what I called messageflow at the time (inspired by our 
> dataflow module which has been treated in the same fashion, going from CPS 
> plugin to async/await macros). This model makes for a nice and direct way 
> of formulating temporary intermediate states within actors, typically 
> encountered while waiting for another actor to reply. Interestingly, the 
> original work on the Actor model (in particular by Gul Agha) already 
> contains this as a primitive of the actor language.
>
> On the other hand I have learned a few more things over the past year that 
> are related to this topic as well. Perhaps the most concise write-up is 
> this issue<https://github.com/reactive-streams/reactive-streams/issues/46> on 
> the Reactive Streams repository. What you are doing is that the actor is 
> not technically blocked (in the sense of parking a thread), but it is still 
> logically blocked, since it cannot handle any message other than the reply 
> that is being awaited—everything else would violate the Actor model in that 
> concurrent entry into the behavior is not allowed: handling a message must 
> determine the behavior that applies to the next message, and this process 
> is applied strictly in sequence. We discussed this topic also with Akara 
> Sucharitakul and Justin du Coeur on this list a few months ago (search for 
> the Aggregator Pattern), and what we concluded was that mixing a normal 
> actor (with the current behavior as the message entry point) with an 
> `await` scheme (that adds secondary message entry points) would lead to 
> confusing behavior in that e.g. a Cancel message would not have the 
> intended effect:
>
>   def receive = {
>     case DoWork =>
>       val r = otherActor ? Query await 5.seconds // this uses become() 
> under the hood
>       sender() ! Reply(transform(r))
>     case Cancel =>
>       context.stop(self)
>   }
>
>
stateless-future-akka works well in these cases, 
because stateless-future-akka supports exception, which continuation plugin 
and scala.async do not support. You just throw an Exception when you 
receive Cancel:


case class CancelException(cancel: Cancel) extends Exception

def nextSaneMessage = Future {
  nextMessage.await match {
    case cancel: Cancel => throw CancelException(cancel)
    case saneMessage => saneMessage
  }
}

override def receive = Future {
  while (true) {
    try {
      val message1 = nextSaneMessage.await
      sender ! s"Thank you for your $message1"
      val message2 = nextSaneMessage.await
      sender ! s"Thank you for your $message2"
    } catch {
      case e @ CancelException(cancel) =>  {
        // handling the cancel case
      }
    } finally {
      // cleanup here
    }
  }
  throw new IllegalStateException("Unreachable code!")
}




 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to