I'm trying to consume a stream from a streaming API (based on akka-http and
akka-streams 1.0-RC2). I'm currently using spray client, since I couldn't
figure out how to do it with Akka Http client yet, but this is another
topic (some pointers are appreciated, though). Anyway, I'm creating a
FlowGraph to process the chunks of my stream. I'm using a few junctions,
such as Balance and Merge. Here's a simplified version of the code.
val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val balancer = Balance[HttpData.NonEmpty](2).shape
val merger = Merge[Try[Map[String,Any]]](2).shape
source ~> balancer.in
balancer.out(0) ~> merger.in(0)
balancer.out(1) ~> merger.in(1)
merger.out ~> Sink.ignore
}.run()
When I try to run this I'm getting:
java.lang.IllegalArgumentException: requirement failed: The input port [
UniformFanOut.in] is not part of the underlying graph.
When I explicitly call .add on the builder to add the junctions to the
Graph it works (see code below):
val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val balancer = b.add(Balance[HttpData.NonEmpty](2))
val merger = b.add(Merge[Try[Map[String,Any]]](2))
source ~> balancer.in
balancer.out(0) ~> merger.in(0)
balancer.out(1) ~> merger.in(1)
merger.out ~> Sink.ignore
}.run()
Is this really necessary? If so, why?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.