Hi Jeoren

On Fri, May 8, 2015 at 2:55 PM, Jeroen Rosenberg <jeroen.rosenb...@gmail.com
> wrote:

> Thanks for the swift reply. So creating junctions will actually create
> Graphs which you explicitly need to connect. This must have been changed
> then, because I remember in earlier versions you didn't need to make any
> explicit calls to the builder and you could just wire/connect them using
> the ~> operator from FlowGraphImplicits.
>

No, that was never true. You always had to assign them to variables first
in order to use the ~> operator, since you cannot ever connect up a proper
junction (more than 2 ports) in just one line. The difference was only that
the apply() methods of Balance/Merge, etc, took an implicit builder all the
time, and automatically added the junctions to the graph. You could not use
these junctions outside of a builder block ever.


> Why can't this make the connection implicitly?
>

Because you can now use graphs in much more rich ways than before. For
example you can add junctions "statically" (or any other graph) as
parameters to the closed() method and therefore use their materialized
values. No builder needed at that point.

Also forcing the apply() functions to take an implicit builder would mean
that you cannot construct these graphs outside a builder block, which would
exclude many valid (and useful) use cases. For example now you can create a
function that accepts a certain shaped graph, for example with 1 input and
2 output ports. Now you can pass to that function any graph, including a
custom, composite one, or a simple atomic one, like Balance.

We *do* implicit additions for Flows and Sources in builder blocks though,
but the only reason why that is possible is because they can be wired up in
one line:

x ~> myFlow ~> y

But a junction is never fully connectable on one line, so you need to store
it into a variable. So now you can

 - create and accept free-floating independent graphs
 - you can embed them into another graph (arbitrarily many times) using
b.add
 - connect the instances as you see fit (each embedding is a new instance)

In fact, you can write the flow example like this:
f = b.add(flow)
x ~> f.inlet
f.outlet ~> y
// or x ~> f ~> y

-Endre


>
> On Friday, May 8, 2015 at 2:37:30 PM UTC+2, Akka Team wrote:
>>
>> Hi Jeoren.
>>
>> On Fri, May 8, 2015 at 2:03 PM, Jeroen Rosenberg <jeroen.r...@gmail.com>
>> wrote:
>>
>>> I'm trying to consume a stream from a streaming API (based on akka-http
>>> and akka-streams 1.0-RC2). I'm currently using spray client, since I
>>> couldn't figure out how to do it with Akka Http client yet, but this is
>>> another topic (some pointers are appreciated, though). Anyway, I'm creating
>>> a FlowGraph to process the chunks of my stream. I'm using a few junctions,
>>> such as Balance and Merge. Here's a simplified version of the code.
>>>
>>> val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
>>>
>>> FlowGraph.closed() { implicit b =>
>>>   import FlowGraph.Implicits._
>>>
>>>   alancer = Balance[HttpData.Nonval Embpty](2).shape
>>>   val merger = Merge[Try[Map[String,Any]]](2).shape
>>>
>>>   source ~> balancer.in
>>>
>>>                    balancer.out(0) ~> merger.in(0)
>>>                    balancer.out(1) ~> merger.in(1)
>>>                                                   merger.out ~> Sink.ignore
>>>
>>> }.run()
>>>
>>>
>>> When I try to run this I'm getting:
>>> java.lang.IllegalArgumentException: requirement failed: The input port [
>>> UniformFanOut.in] is not part of the underlying graph.
>>>
>>>
>> Because you have not added them to the graph.
>>
>>
>>> When I explicitly call .add on the builder to add the junctions to the
>>> Graph it works (see code below):
>>>
>>> val source = Source(ActorPublisher[HttpData.NonEmpty](someProcessor))
>>>
>>> FlowGraph.closed() { implicit b =>
>>>   import FlowGraph.Implicits._
>>>
>>>   val balancer = b.add(Balance[HttpData.NonEmpty](2))
>>>   val merger = b.add(Merge[Try[Map[String,Any]]](2))
>>>
>>>   source ~> balancer.in
>>>
>>>                    balancer.out(0) ~> merger.in(0)
>>>                    balancer.out(1) ~> merger.in(1)
>>>                                                   merger.out ~> Sink.ignore
>>>
>>> }.run()
>>>
>>>
>>> Is this really necessary?
>>>
>>
>> Yes.
>>
>>
>>> If so, why?
>>>
>>
>> Because otherwise they are not made part of the graph. The builder
>> somehow need to know that you want to add them.
>>
>> For example:
>>
>> balancer = Balance[HttpData.Nonval Empty](2) // creates a standalone
>> graph that is a balancer with two output ports. The builder have no clue
>> about this graph, this is a standalone thing.
>>
>> balancer1 = b.add(balancer) // embeds a copy of balancer into the graph
>> being built
>> balancer2 = b.add(balancer) // embeds another copy of balancer into the
>> graph
>>
>> // balancer1 is not the same instance as balancer2
>>
>> -Endre
>>
>>
>>>
>>>  --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Akka Team
>> Typesafe - Reactive apps on the JVM
>> Blog: letitcrash.com
>> Twitter: @akkateam
>>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to