Hi Michael, leaving aside the API for a moment, the question is how the streams should behave. The actual live streams are not immutable but are running things that need to receive data from the original request and send this data out with another request. (I haven't completely understood in which way you want to "split" the source, broadcasting seems to hint that you are going to duplicate data and maybe filter afterwards? It may not matter at this point.)
Let's assume you want to duplicate the stream. How should the duplicated streams behave wrt backpressure when they are actually consumed? The simplest strategy is to assume that you want them to backpressure together, i.e. the slower consumer will set the speed for both of them and the two "stream pointers" only differ as far as the bounded buffers after the broadcast allow. Is that what you are after? So far for the theoretical operation of the streams. The problem with the API is, that the FlowGraph API requires that the whole graph is materialized together. This means that you cannot possibly return two separate unmaterialized Sources from a graph building operation which is what you need for creating two HttpEntities. One workaround could be to wire the two outputs to two `Sink.publishers` during graph building, and then directly run the graph to obtain two publishers which you then need to wrap into two sources which you can put into two new requests. This works because PublisherSinks can already be materialized on their sink side while still being waiting for a subscriber on the Publisher side. In any case, processing will only start when both sources are running (aside from buffers already filling up before that). Does that make sense? If you actually meant splitting the stream by some criteria, i.e. you will first receive data for creating the first request and afterwards for creating the second request there may another solution where you try to create a `Flow[ByteString, HttpRequest]` and feed that into something that actually runs the requests. The challenging thing here is that you need to create an intermediate operation `Flow[ByteString, Source[ByteString]]` which can only be done using groupBy, splitAfter, or splitWhen combinators which, however, cannot be stateful, so you may need some boilerplate to get that scheme implemented. Cheers, Johannes On Monday, July 13, 2015 at 7:04:21 PM UTC+2, Michael Hamrah wrote: > > I'm having trouble wiring the following logical flow: > > akka-http route -> grab request entity's source stream -> split the source > stream in two -> pass the source stream to two new http requests with the > source stream in the http entity. > > I can easily grab the incoming request entity's source stream. I can then > create a FlowGraph with a Broadcast stage to split it. However, what I > can't seem to do > is wire up the rest. I need two Source[ByteString]s to build the > HttpEntity to make the upstream request. Using broadcast.out(0) gives me an > outlet which I can't convert to a Stream; if I create a Flow, I have a > bunch of ByteStrings generating sources. > > I believe I need to "wrap" the entire plit incoming stream--not the > specific byte string chunks-in a new http request, but not sure how. > > Any ideas? > > Thanks, > > Mike > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.