Hi Jeoren.
On Fri, May 8, 2015 at 2:03 PM, Jeroen Rosenberg <[email protected]
> wrote:
> I'm trying to consume a stream from a streaming API (based on akka-http
> and akka-streams 1.0-RC2). I'm currently using spray client, since I
> couldn't figure out how to do it with Akka Http client yet, but this is
> another topic (some pointers are appreciated, though). Anyway, I'm creating
> a FlowGraph to process the chunks of my stream. I'm using a few junctions,
> such as Balance and Merge. Here's a simplified version of the code.
>
> val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
>
> FlowGraph.closed() { implicit b =>
> import FlowGraph.Implicits._
>
> alancer = Balance[HttpData.Nonval Embpty](2).shape
> val merger = Merge[Try[Map[String,Any]]](2).shape
>
> source ~> balancer.in
>
> balancer.out(0) ~> merger.in(0)
> balancer.out(1) ~> merger.in(1)
> merger.out ~> Sink.ignore
>
> }.run()
>
>
> When I try to run this I'm getting:
> java.lang.IllegalArgumentException: requirement failed: The input port [
> UniformFanOut.in] is not part of the underlying graph.
>
>
Because you have not added them to the graph.
> When I explicitly call .add on the builder to add the junctions to the
> Graph it works (see code below):
>
> val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
>
> FlowGraph.closed() { implicit b =>
> import FlowGraph.Implicits._
>
> val balancer = b.add(Balance[HttpData.NonEmpty](2))
> val merger = b.add(Merge[Try[Map[String,Any]]](2))
>
> source ~> balancer.in
>
> balancer.out(0) ~> merger.in(0)
> balancer.out(1) ~> merger.in(1)
> merger.out ~> Sink.ignore
>
> }.run()
>
>
> Is this really necessary?
>
Yes.
> If so, why?
>
Because otherwise they are not made part of the graph. The builder somehow
need to know that you want to add them.
For example:
balancer = Balance[HttpData.Nonval Empty](2) // creates a standalone graph
that is a balancer with two output ports. The builder have no clue about
this graph, this is a standalone thing.
balancer1 = b.add(balancer) // embeds a copy of balancer into the graph
being built
balancer2 = b.add(balancer) // embeds another copy of balancer into the
graph
// balancer1 is not the same instance as balancer2
-Endre
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.