Hi all,
Finally I found the way to do it. It wasn't easy to understand the API for
the FlowGraph.closed()
<http://doc.akka.io/api/akka-stream-and-http-experimental/1.0/index.html#akka.stream.scaladsl.FlowGraph$>
function, but taking a deeper look I understood that it needs a
*combinedMat* value in addition to a Shape like the collector in the
example. This is how the method is defined in the FlowGraph object:
def closed[Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(
combineMat: (M1, M2) ⇒ Mat)(buildBlock: (Builder[Mat]) ⇒ (Graph.Shape, Graph
.Shape) ⇒ Unit): RunnableGraph[Mat]
And regarding the example, now it's working doing this:
val (out0, out1) = FlowGraph.closed(collector[Event], collector[Event])((m1,
m2) => (m1, m2)) { implicit builder => (out0, out1) =>
val eif = builder.add(eventInputFlow)
Source.single(someEvent) ~> eif.in
eif.out(0) ~> out0
eif.out(1) ~> out1
}.run()
Cheers,
Gabriel.
El lunes, 17 de agosto de 2015, 11:34:05 (UTC+1), Gabriel Volpe escribió:
>
> Thanks for your response Roland,
>
> I've been trying to run the test as you suggested but I had no luck. The
> test works if I do something like this (I'm testing only the first output):
>
> val out = FlowGraph.closed(collector[Event]) { implicit builder => out =>
> val eif = builder.add(eventInputFlow)
> Source.single(someEvent) ~> eif.in
> eif.out(0) ~> out
> eif.out(1) ~> Sink.ignore
> }.run()
>
> But if I try to do it in this way I get a compilation error:
>
> val (out0, out1) = FlowGraph.closed(collector[Event], collector[Event]) {
> implicit builder => (out0, out1) =>
> val eif = builder.add(eventInputFlow)
> Source.single(someEvent) ~> eif.in
> eif.out(0) ~> out0
> eif.out(1) ~> out1
> }.run()
>
> This is the error:
>
> error: missing parameter type
> [ERROR] val (out0, out1) = FlowGraph.closed(collector[Event],
> collector[Event]) { implicit builder => (out0, out1) =>
>
> And BTW I'm using the version 1.0 of Akka Streams.
>
> El sábado, 15 de agosto de 2015, 7:57:52 (UTC+1), Akka Team escribió:
>>
>> Hi Gabriel,
>>
>> testing such a Graph as a black box is rather simple: just attach a
>> Source that emits the test inputs and attach a Sink to each output that
>> collects all emitted elements (e.g. using
>> Flow[...].grouped(1000).toMat(Sink.head)(Keep.right)). When running that
>> flow, the output Futures will eventually be completed and you can assert
>> that they contain the right collections.
>>
>> def collector[T] = Flow[T].grouped(1000).toMat(Sink.head)(Keep.right)
>> val (out0, out1) = FlowGraph.closed(collector[Event], collector[Event]) {
>> implicit builder => (out0, out1) =>
>> val eif = builder.add(eventInputFlow)
>> Source(ev1, ev2, ev3) ~> eif.in
>> eif.out(0) ~> out0
>> eif.out(1) ~> out1
>> }.run()
>>
>> Regards,
>>
>> Roland
>>
>> On Thu, Aug 13, 2015 at 1:44 PM, Gabriel Volpe <[email protected]>
>> wrote:
>>
>>> Hi Viktor,
>>>
>>> I've been reading the official docs (
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-testkit.html)
>>>
>>> and researching into the scala docs but it seems there is no way to test
>>> neither a FanInShape or a FanOutShape. The examples in the documentation
>>> are trivial (only testing a simple Flow).
>>>
>>> I tried to wrap the FanOutShape into two Partial Flow Shapes using
>>> Out(0) for one and Out(1) for the other one. But when I try to use this
>>> FlowShape (using Flow.wrap(theFlowShape)) connected to a Source and a Sink
>>> I got an error like this one:
>>>
>>> Element must not be null, rule 2.13
>>> java.lang.NullPointerException: Element must not be null, rule 2.13
>>> at
>>> akka.stream.impl.ReactiveStreamsCompliance$.elementMustNotBeNullException(ReactiveStreamsCompliance.scala:44)
>>> at
>>> akka.stream.impl.ReactiveStreamsCompliance$.requireNonNullElement(ReactiveStreamsCompliance.scala:70)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:287)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:434)
>>> at akka.stream.impl.fusing.Map.onPush(Ops.scala:24)
>>> at akka.stream.impl.fusing.Map.onPush(Ops.scala:23)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
>>> at
>>> akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
>>> at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
>>> at
>>> akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
>>> at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>> at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
>>> at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
>>> at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at
>>> akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> El jueves, 13 de agosto de 2015, 12:09:45 (UTC+1), √ escribió:
>>>>
>>>> Hi Gabriel,
>>>>
>>>> what did you try and in what way doesn't it do what you expected?
>>>>
>>>> On Thu, Aug 13, 2015 at 12:39 PM, Gabriel Volpe <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi hakkers!
>>>>>
>>>>> I found difficult to test a Partial Flow Graph. I can test the
>>>>> different single pieces, but not the whole partial flow. I'm separating
>>>>> the
>>>>> flows depending on it's functionality and I wish to test every partial
>>>>> flow
>>>>> like a black box.
>>>>>
>>>>> For instance, given the following partial flow:
>>>>>
>>>>> lazy val eventInputFlow = FlowGraph.partial() { implicit builder =>
>>>>> val headerEnricherFlow = builder.add(Flow[Event] map (e =>
>>>>> applyHeaders(e)))
>>>>> val eventFilter = builder.add(eventFilterFlow)
>>>>> val eventOriginFilter = builder.add(eventOriginFilterFlow)
>>>>> val eventDeletedLogger = builder.add(eventDeletedLoggerFlow)
>>>>> val sinkIgnore = builder.add(Sink.ignore)
>>>>>
>>>>> headerEnricherFlow ~> eventFilter
>>>>> eventFilter.out(0) ~> eventOriginFilter
>>>>> eventFilter.out(1) ~> eventDeletedLogger ~> sinkIgnore
>>>>>
>>>>> UniformFanOutShape(headerEnricherFlow.inlet, eventOriginFilter.out(0
>>>>> ), eventOriginFilter.out(1))
>>>>> }.named("eventInputFlow")
>>>>>
>>>>> It would be nice to test the 1 Input and the 2 outputs of this graph.
>>>>>
>>>>> I don't really know how to achieve this.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Gabriel.
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ:
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> √
>>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Akka Team
>> Typesafe - Reactive apps on the JVM
>> Blog: letitcrash.com
>> Twitter: @akkateam
>>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.