To add some more detail: getting such a Processor actor right is actually quite challenging, offering that as user-level API feels like handing you a loaded gun that shoots in unpredictable directions while being wrapped in barbed wire ;-) Sure, it is possible to implement it correctly, but I think you understand what I mean.
mapAsync allows the separation of concerns into stream element passing (done by Akka) and the business logic (done in the Actor you “ask”), and we’ll eventually need to implement another primitive that allows the embedding of “sub-flows” that are materialized on-demand or conditionally as part of a larger flow, i.e. a “flow-in-flow” wrapper. Going to the bare Reactive Streams interfaces inside a flow graph should never be necessary in my opinion, these are only intended for external interop. Regards, Roland > 31 mar 2015 kl. 12:00 skrev Konrad Malawski <[email protected]>: > > Hi there, > we talked about this briefly in the team yesterday, the outcome is that we do > not want to expose such low level abstraction as ActorProcessor. > Instead you'll be better off using mapAsync to integrate with an Actor, which > then can do whatever it needs to do. > > Also, current consensus about integrating with Actors as Sinks and Sources is > that we'll provide simple TellSink to drain streams into Actors as an > additional "simple case", > and we'll eventually update ActorPublisher / ActorSubscriber to be even > easier to use (more like Stage perhaps). > > Hope this helps, happy hakking! > > On Sun, Mar 29, 2015 at 11:12 PM, <[email protected] > <mailto:[email protected]>> wrote: > I looked throught the code and AFAIU it's not very hard to add support for > this feature. It's just a matter of adding another StageModule subclass which > takes Props argument, implementing handling of this Module inside > ActorProcessorFactory.props method(i.e. something like this: > def props(materializer: ActorFlowMaterializerImpl, op: StageModule, > parentAttributes: OperationAttributes): (Props, Any) = { > val att = parentAttributes and op.attributes > // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW > // Also, otherwise the attributes will not affect the settings properly! > val settings = calcSettings(att)(materializer.settings) > op match { > case ActorProcessor(props) ⇒ (props, ()) > ... > ) and additional Flow.apply(props: Props) which would create ActorProcessor > stage(just like apply(props: Props) for Sink and Source). It would allow us > to integrate non-reactive stream components(e.g. work pulling pattern > implementation) right inside the Flow DSL without additional overhead. What > do you think? > > Thanks in advance! > > > On Sunday, March 29, 2015 at 5:15:42 AM UTC+3, [email protected] > <mailto:[email protected]> wrote: > Just to clarify: are there any plans to add support for custom actor > processors inside Flow DSL(e.g. Flow(Props[ProcessorActor])) in near future? > I believe that this feature is quite important. > > On Friday, March 27, 2015 at 11:56:59 AM UTC+3, Patrik Nordwall wrote: > Perhaps you can use mapAsync and ask the actor, instead of using > ActorSubscriber, ActorPublisher? > > Later, there will be an AsyncStage that might be useful. > > /Patrik > > On Mon, Mar 23, 2015 at 6:56 PM, <[email protected] <>> wrote: > Hi! I would like to use custom actor which implements ActorPublisher and > ActorSubscriber as a stage in Flow and pass it to HTTP connection.handleWith. > However I can't find any references how to achieve this and currently I'm > using something like this: > val createProcessor = (system: ActorSystem) => { > val processorRef = system.actorOf(Props[ProcessorActor]) > val sub = Sink (ActorSubscriber[HttpRequest] (processorRef) ) > val pub = Source (ActorPublisher[HttpResponse] (processorRef) ) > Flow () (b => (b.add(sub), b.add(pub))) > } > and call this function for each incoming connection. > > Are there any better alternatives or this approach is ok? > > Thanks in advance! > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <>. > To post to this group, send email to [email protected] <>. > Visit this group at http://groups.google.com/group/akka-user > <http://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > > -- > > Patrik Nordwall > Typesafe <http://typesafe.com/> - Reactive apps on the JVM > Twitter: @patriknw > > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at http://groups.google.com/group/akka-user > <http://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > > > -- > Cheers, > Konrad 'ktoso' Malawski > Akka <http://akka.io/> @ Typesafe <http://typesafe.com/> > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/> > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html> > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > >>>>>>>>>> <https://groups.google.com/group/akka-user> > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To post to this group, send email to [email protected] > <mailto:[email protected]>. > Visit this group at http://groups.google.com/group/akka-user > <http://groups.google.com/group/akka-user>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. Dr. Roland Kuhn Akka Tech Lead Typesafe <http://typesafe.com/> – Reactive apps on the JVM. twitter: @rolandkuhn <http://twitter.com/#!/rolandkuhn> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
