Hi Michael,

leaving aside the API for a moment, the question is how the streams should 
behave. The actual live streams are not immutable but are running things 
that need to receive data from the original request and send this data out 
with another request. (I haven't completely understood in which way you 
want to "split" the source, broadcasting seems to hint that you are going 
to duplicate data and maybe filter afterwards? It may not matter at this 
point.)

Let's assume you want to duplicate the stream. How should the duplicated 
streams behave wrt backpressure when they are actually consumed? The 
simplest strategy is to assume that you want them to backpressure together, 
i.e. the slower consumer will set the speed for both of them and the two 
"stream pointers" only differ as far as the bounded buffers after the 
broadcast allow. Is that what you are after?

So far for the theoretical operation of the streams. The problem with the 
API is, that the FlowGraph API requires that the whole graph is 
materialized together. This means that you cannot possibly return two 
separate unmaterialized Sources from a graph building operation which is 
what you need for creating two HttpEntities. One workaround could be  to 
wire the two outputs to two `Sink.publishers` during graph building, and 
then directly run the graph to obtain two publishers which you then need to 
wrap into two sources which you can put into two new requests. This works 
because PublisherSinks can already be materialized on their sink side while 
still being waiting for a subscriber on the Publisher side. In any case, 
processing will only start when both sources are running (aside from 
buffers already filling up before that).

Does that make sense?

If you actually meant splitting the stream by some criteria, i.e. you will 
first receive data for creating the first request and afterwards for 
creating the second request there may another solution where you try to 
create a `Flow[ByteString, HttpRequest]` and feed that into something that 
actually runs the requests. The challenging thing here is that you need to 
create an intermediate operation `Flow[ByteString, Source[ByteString]]` 
which can only be done using groupBy, splitAfter, or splitWhen combinators 
which, however, cannot be stateful, so you may need some boilerplate to get 
that scheme implemented.

Cheers,
Johannes


On Monday, July 13, 2015 at 7:04:21 PM UTC+2, Michael Hamrah wrote:
>
> I'm having trouble wiring the following logical flow:
>
> akka-http route -> grab request entity's source stream -> split the source 
> stream in two -> pass the source stream to two new http requests with the 
> source stream in the http entity.
>
> I can easily grab the incoming request entity's source stream. I can then 
> create a FlowGraph with a Broadcast stage to split it. However, what I 
> can't seem to do
> is wire up the rest. I need two Source[ByteString]s to build the 
> HttpEntity to make the upstream request. Using broadcast.out(0) gives me an 
> outlet which I can't convert to a Stream; if I create a Flow, I have a 
> bunch of ByteStrings generating sources. 
>
> I believe I need to "wrap" the entire plit incoming stream--not the 
> specific byte string chunks-in a new http request, but not sure how.
>
> Any ideas?
>
> Thanks,
>
> Mike
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to