I looked throught the code and AFAIU it's not very hard to add support for 
this feature. It's just a matter of adding another *StageModule* subclass 
which takes *Props* argument, implementing handling of this *Module* inside 
*ActorProcessorFactory.props* method(i.e. something like this:

def props(materializer: ActorFlowMaterializerImpl, op: StageModule, 
parentAttributes: OperationAttributes): (Props, Any) = {
  val att = parentAttributes and op.attributes
  // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
  // Also, otherwise the attributes will not affect the settings properly!
  val settings = calcSettings(att)(materializer.settings)
  op match {
    case ActorProcessor(props) ⇒ (props, ())
    ...

) and additional Flow.apply(props: Props) which would create ActorProcessor 
stage(just like apply(props: Props) for Sink and Source). It would allow us 
to integrate non-reactive stream components(e.g. work pulling pattern 
implementation) right inside the Flow DSL without additional overhead. What 
do you think?

Thanks in advance!

On Sunday, March 29, 2015 at 5:15:42 AM UTC+3, [email protected] wrote:
>
> Just to clarify: are there any plans to add support for custom actor 
> processors inside Flow DSL(e.g. *Flow(Props[ProcessorActor])*) in near 
> future? I believe that this feature is quite important.
>
> On Friday, March 27, 2015 at 11:56:59 AM UTC+3, Patrik Nordwall wrote:
>>
>> Perhaps you can use mapAsync and ask the actor, instead of 
>> using ActorSubscriber, ActorPublisher?
>>
>> Later, there will be an AsyncStage that might be useful.
>>
>> /Patrik
>>
>> On Mon, Mar 23, 2015 at 6:56 PM, <[email protected]> wrote:
>>
>>> Hi! I would like to use custom actor which implements ActorPublisher and 
>>> ActorSubscriber as a stage in Flow and pass it to HTTP 
>>> connection.handleWith. However I can't find any references how to achieve 
>>> this and currently I'm using something like this: 
>>>
>>> val createProcessor = (system: ActorSystem) => {
>>>   val processorRef = system.actorOf(Props[ProcessorActor])
>>>   val sub = Sink (ActorSubscriber[HttpRequest] (processorRef) )
>>>   val pub = Source (ActorPublisher[HttpResponse] (processorRef) )
>>>   Flow () (b => (b.add(sub), b.add(pub)))
>>> }
>>>
>>> and call this function for each incoming connection.
>>>
>>> Are there any better alternatives or this approach is ok?
>>>
>>> Thanks in advance!
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>>
>> Patrik Nordwall
>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>> 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to