Actually, there is a way to access the materialized value of a graph in the graph itself, see: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-graphs.html#Accessing_the_materialized_value_inside_the_Graph
-Endre On Sun, Jun 21, 2015 at 7:43 PM, Lance Arlaus <[email protected]> wrote: > I think one of the keys here is that the materialized value (the > Future[Long] in this case) is not an element, per se, so it won't get > streamed inline. > In other words, don't expect to do a broadcast/zip to get the materialized > values. > You'll need to capture the materialized value when the stream is > materialized, not when the flow is constructed. > > I don't know your exact logic, but here's a quick stab at your problem in > the form of a ScalaTest. I'm creating the sink as a separate step and I > added some verbose typing, etc. to illustrate and hopefully make things > clear. Of course, imports and such omitted for brevity. > > Note the use of the materialized value combiner function (m1, m2) => (m1, > m2). This will get you both materialized values out of the sink. > > def outputSink(outStream1: OutputStream, outStream2: OutputStream): > Sink[ByteString, (Future[Long], Future[Long])] = { > import akka.stream.scaladsl.FlowGraph.Implicits._ > > val outSink1 = OutputStreamSink(() => outStream1) > val outSink2 = OutputStreamSink(() => outStream2) > > Sink(outSink1, outSink2)((m1, m2) => (m1, m2)) { implicit builder => > (out1, out2) => { > val bcast = builder.add(Broadcast[ByteString](2)) > > bcast ~> out1 > bcast ~> out2 > > (bcast.in) > } > } > } > > "Flow" should "produce materialized values" in { > val data = "Some String" > val source = InputStreamSource(() => new > ByteArrayInputStream(data.getBytes)) > val outStream1 = new ByteArrayOutputStream() > val outStream2 = new ByteArrayOutputStream() > val sink = outputSink(outStream1, outStream2) > > val runnable: RunnableFlow[(Future[Long], Future[Long])] = > source.toMat(sink)(Keep.right) > > val (size1, size2) = runnable.run() > > whenReady(size1) { size => > size shouldBe data.getBytes.length > } > whenReady(size2) { size => > size shouldBe data.getBytes.length > } > > Regards, > Lance > > > On Saturday, June 20, 2015 at 8:13:08 AM UTC-4, Michael Hamrah wrote: >> >> I'm working with an OutputStreamSink ( >> http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/index.html#akka.stream.io.OutputStreamSink$) >> which materializes a Future[Long]. I'd like to use an OutputStreamSink as >> part of a flow, but I can't get the types to line up. >> >> Ideally my graph will look something like this; it splits an input >> stream, zips the result, and checker checks to make sure the future[long]'s >> are equal. >> >> ``` >> in ~> broadcast ~> outputsink1 ~> zip.in0 ~> checker >> ~> broadcast ~> outputsink2 ~> zip.in1 >> ``` >> >> any ideas? It seems like I need to rewrite OutputStreamSink from a Sink >> to a Flow, or use a SubscriberActor with a PublisherActor, or possibly >> write some sort of PushPullStage. Seems like there should be an easy way to >> "lift" a sink with a materialized value to a flow stage. >> >> Thanks, >> >> Mike >> > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
