I think one of the keys here is that the materialized value (the
Future[Long] in this case) is not an element, per se, so it won't get
streamed inline.
In other words, don't expect to do a broadcast/zip to get the materialized
values.
You'll need to capture the materialized value when the stream is
materialized, not when the flow is constructed.
I don't know your exact logic, but here's a quick stab at your problem in
the form of a ScalaTest. I'm creating the sink as a separate step and I
added some verbose typing, etc. to illustrate and hopefully make things
clear. Of course, imports and such omitted for brevity.
Note the use of the materialized value combiner function (m1, m2) => (m1,
m2). This will get you both materialized values out of the sink.
def outputSink(outStream1: OutputStream, outStream2: OutputStream):
Sink[ByteString, (Future[Long], Future[Long])] = {
import akka.stream.scaladsl.FlowGraph.Implicits._
val outSink1 = OutputStreamSink(() => outStream1)
val outSink2 = OutputStreamSink(() => outStream2)
Sink(outSink1, outSink2)((m1, m2) => (m1, m2)) { implicit builder =>
(out1, out2) => {
val bcast = builder.add(Broadcast[ByteString](2))
bcast ~> out1
bcast ~> out2
(bcast.in)
}
}
}
"Flow" should "produce materialized values" in {
val data = "Some String"
val source = InputStreamSource(() => new
ByteArrayInputStream(data.getBytes))
val outStream1 = new ByteArrayOutputStream()
val outStream2 = new ByteArrayOutputStream()
val sink = outputSink(outStream1, outStream2)
val runnable: RunnableFlow[(Future[Long], Future[Long])] =
source.toMat(sink)(Keep.right)
val (size1, size2) = runnable.run()
whenReady(size1) { size =>
size shouldBe data.getBytes.length
}
whenReady(size2) { size =>
size shouldBe data.getBytes.length
}
Regards,
Lance
On Saturday, June 20, 2015 at 8:13:08 AM UTC-4, Michael Hamrah wrote:
>
> I'm working with an OutputStreamSink (
> http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/index.html#akka.stream.io.OutputStreamSink$)
>
> which materializes a Future[Long]. I'd like to use an OutputStreamSink as
> part of a flow, but I can't get the types to line up.
>
> Ideally my graph will look something like this; it splits an input stream,
> zips the result, and checker checks to make sure the future[long]'s are
> equal.
>
> ```
> in ~> broadcast ~> outputsink1 ~> zip.in0 ~> checker
> ~> broadcast ~> outputsink2 ~> zip.in1
> ```
>
> any ideas? It seems like I need to rewrite OutputStreamSink from a Sink to
> a Flow, or use a SubscriberActor with a PublisherActor, or possibly write
> some sort of PushPullStage. Seems like there should be an easy way to
> "lift" a sink with a materialized value to a flow stage.
>
> Thanks,
>
> Mike
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.