If that can help, here is how I dealt with merging n streams:
https://gist.github.com/williamleferrand/6d27d8826788174c3b4c

I haven't tested with 1.0-M4 but it works fine with 1.0-M1. It might hurt
your eyes but I had all kind of issues when trying to make it simpler -
comments are very welcome!

On Mon, Mar 23, 2015 at 7:20 AM, Ajay Padala <[email protected]> wrote:

> I can find this example for an older akka-streams but not one that works
> with Akka-streams 1.0-M4
>
> In particular, i am having trouble figuring out how to emit the buffered
> element (reference) after both the streams have closed.
>
> I have tried to go to this state after both streams have closed, but the
> onInput method is never called. Not sure how to frame the read condition so
> that it gets called when downstream wants an element
>
> def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
>   (ctx, input, elem) ⇒
>     ctx.emit(ref)
>     ctx.finish()
>     SameState
> }
>
>
>
>
> Or maybe I am going about it the wrong way completely. Full Code below:
>
> object StreamMerge extends App {
>
> implicit val actorSystem = ActorSystem("LogFiles")
> implicit val mat = ActorFlowMaterializer()
>
>   class OrderedMergeShape[A](_init: Init[A] = Name("OrderedMerge")) extends 
> FanInShape[A](_init) {
>     val input1 = newInlet[A]("input1")
>     val input2 = newInlet[A]("input2")
>
>     override protected def construct(init: Init[A]): FanInShape[A] = new 
> OrderedMergeShape(init)
>   }
>
>   class IntOrderMerge
>     extends FlexiMerge[Int, OrderedMergeShape[Int]](
>       new OrderedMergeShape(), OperationAttributes.name("OrderedMerge")) {
>     import FlexiMerge._
>
>
>     override def createMergeLogic(s: OrderedMergeShape[Int]): MergeLogic[Int] 
> = new MergeLogic[Int] {
>       var ref: Int = _
>       var refValid = false
>
>
>       def other(input: InPort): Inlet[Int] = if (input eq s.input1) s.input2 
> else s.input1
>
>       def getFirstElement = State[Int](ReadAny(s.input1, s.input2)) { (ctx, 
> input, elem) ⇒
>         ref = elem
>         refValid = true
>         ctx.changeCompletionHandling(emitOtherOnClose)
>         readUntilLarger(other(input))
>       }
>
>       override def initialState: State[Int] = getFirstElement
>
>       def readUntilLarger(curInput: Inlet[Int]): State[Int] = 
> State[Int](Read(curInput)) {
>         (ctx, input, elem) ⇒
>           if (elem <= ref) {
>             ctx.emit(elem)
>             SameState
>           }
>           else {
>             ctx.emit(ref)
>             ref = elem
>             readUntilLarger(other(input))
>           }
>       }
>
>       def readRemaining(curInput: Inlet[Int]) = State[Int](Read(curInput)) {
>         (ctx, input, elem) ⇒
>           if (elem <= ref)
>             ctx.emit(elem)
>           else {
>             ctx.emit(ref)
>             ref = elem
>           }
>           SameState
>       }
>
>       def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
>         (ctx, input, elem) ⇒
>           ctx.emit(ref)
>           ctx.finish()
>           SameState
>       }
>
>       val emitLast = CompletionHandling(
>         onUpstreamFinish = { (ctx, input) ⇒
>           ctx.changeCompletionHandling(defaultCompletionHandling)
>           produceRefAndClose
>         },
>         onUpstreamFailure = { (ctx, _, cause) ⇒
>           ctx.fail(cause)
>           SameState
>         }
>       )
>
>       val emitOtherOnClose = CompletionHandling(
>         onUpstreamFinish = { (ctx, input) ⇒
>           ctx.changeCompletionHandling(emitLast)
>           readRemaining(other(input))
>         },
>         onUpstreamFailure = { (ctx, _, cause) ⇒
>           ctx.fail(cause)
>           SameState
>         }
>       )
>
>
>
>       override def initialCompletionHandling: CompletionHandling = 
> emitOtherOnClose
>     }
>
>   }
>
>   def printIntStream(): Unit = {
>     val srcA = Source(List(1,2,2,3,7,8))
>     val srcB = Source(List(2,3,3,6,7))
>     val sink = Sink.foreach(println)
>
>     val fGraph = FlowGraph.closed() { implicit b ⇒
>       import FlowGraph.Implicits._
>       val intOrderMerge = b.add(new IntOrderMerge)
>       srcA ~> intOrderMerge.input1
>       srcB ~> intOrderMerge.input2
>       intOrderMerge.out ~> sink
>     }
>
>     println("Running flow")
>     fGraph.run()
>   }
>
>   printIntStream()
> }
>
>
>
>
>
> On Wednesday, 10 December 2014 09:51:11 UTC, drewhk wrote:
>>
>>
>>
>> On Tue, Dec 9, 2014 at 8:07 PM, William Le Ferrand <[email protected]>
>> wrote:
>>
>>> Dear List,
>>>
>>> I'm trying to revisit this problem now that streams is in a stable
>>> state; but I don't quite see how to use MergeLogic to perform this sort
>>> merge - is it actually possible and does anyone would have hints toward a
>>> working solution?
>>>
>>
>> Yes, in the test for FlexiMerge there is a sorting merge. Docs will come
>> soon and will contain a cookbook section with recipes like this.
>>
>> -Endre
>>
>>
>>>
>>> thanks in advance,
>>>
>>> best
>>>
>>> william
>>>
>>> On Wed, Sep 24, 2014 at 7:28 AM, √iktor Ҡlang <[email protected]>
>>> wrote:
>>>
>>>> +1
>>>>
>>>> On Wed, Sep 24, 2014 at 5:07 AM, Endre Varga <[email protected]>
>>>> wrote:
>>>>
>>>>> There will be all kinds of merges available in the future. Akka
>>>>> Streams is still a preview so we change stuff all the time. To get to the
>>>>> point where we can support pluggable merges we needed the graph API and
>>>>> support for n-way fan-in operations first. So stay tuned :)
>>>>>
>>>>> -Endre
>>>>>
>>>>> On Wed, Sep 24, 2014 at 1:43 AM, William Le Ferrand <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> It looks like that: https://gist.github.com/williamleferrand/
>>>>>> c77133af821d5eb278be (I feel bad looking at it)
>>>>>>
>>>>>> On Tue, Sep 23, 2014 at 4:38 PM, √iktor Ҡlang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Now it is way more clear! :) So you have 2 sorted streams that you
>>>>>>> want to merge to be one sorted stream.
>>>>>>> Right now I don't think there's a good machinery for doing this, but
>>>>>>> I may be misinformed.
>>>>>>> What does the solution that you came up with look like?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 23, 2014 at 7:29 PM, William Le Ferrand <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Sorry, the source stream are already sorted (that's what I meant
>>>>>>>> earlier by "ordered data"), so you just have to compare the head of 
>>>>>>>> your
>>>>>>>> two sources to decide which element you want to push in the outbound 
>>>>>>>> stream
>>>>>>>> first. Does it make sense?
>>>>>>>>
>>>>>>>> On Tue, Sep 23, 2014 at 4:24 PM, √iktor Ҡlang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I still don't understand, if we have 2 unbounded streams, how do
>>>>>>>>> we sort them without consuming all of them first?
>>>>>>>>>
>>>>>>>>> On Tue, Sep 23, 2014 at 7:06 PM, William Le Ferrand <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Victor,
>>>>>>>>>>
>>>>>>>>>> It would be an unbounded stream of ordered items (potentially as
>>>>>>>>>> slow as the slowest source stream)
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 23, 2014 at 3:59 PM, √iktor Ҡlang <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi William,
>>>>>>>>>>>
>>>>>>>>>>> I am not sure I understand the question, given N unbounded
>>>>>>>>>>> streams of comparable items, what does the resulting stream look 
>>>>>>>>>>> like?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 23, 2014 at 1:31 PM, William Le Ferrand <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Dear List,
>>>>>>>>>>>>
>>>>>>>>>>>> I've a set of reactive streams that contain ordered data. Is
>>>>>>>>>>>> there an easy way to "sort merge" them into a single reactive 
>>>>>>>>>>>> stream? I
>>>>>>>>>>>> couldn't find anything in the documentation nor on the web; I came 
>>>>>>>>>>>> up with
>>>>>>>>>>>> a handmade solution but it looks unnecessary obfuscated
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>> William
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> William Le Ferrand
>>>>>>>>>>>>
>>>>>>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>>>>>>> current/additional/faq.html
>>>>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>>>>>>> group/akka-user
>>>>>>>>>>>> ---
>>>>>>>>>>>> You received this message because you are subscribed to the
>>>>>>>>>>>> Google Groups "Akka User List" group.
>>>>>>>>>>>> To unsubscribe from this group and stop receiving emails from
>>>>>>>>>>>> it, send an email to [email protected].
>>>>>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Cheers,
>>>>>>>>>>> √
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>>>>>> current/additional/faq.html
>>>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>>>>>> group/akka-user
>>>>>>>>>>> ---
>>>>>>>>>>> You received this message because you are subscribed to the
>>>>>>>>>>> Google Groups "Akka User List" group.
>>>>>>>>>>> To unsubscribe from this group and stop receiving emails from
>>>>>>>>>>> it, send an email to [email protected].
>>>>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> William Le Ferrand
>>>>>>>>>>
>>>>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>>>>> current/additional/faq.html
>>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>>>>> group/akka-user
>>>>>>>>>> ---
>>>>>>>>>> You received this message because you are subscribed to the
>>>>>>>>>> Google Groups "Akka User List" group.
>>>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>>>> send an email to [email protected].
>>>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cheers,
>>>>>>>>> √
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>>>> current/additional/faq.html
>>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>>>> group/akka-user
>>>>>>>>> ---
>>>>>>>>> You received this message because you are subscribed to the Google
>>>>>>>>> Groups "Akka User List" group.
>>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>>> send an email to [email protected].
>>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> William Le Ferrand
>>>>>>>>
>>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>>
>>>>>>>> --
>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>>> current/additional/faq.html
>>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>>> group/akka-user
>>>>>>>> ---
>>>>>>>> You received this message because you are subscribed to the Google
>>>>>>>> Groups "Akka User List" group.
>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>> send an email to [email protected].
>>>>>>>> To post to this group, send email to [email protected].
>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cheers,
>>>>>>> √
>>>>>>>
>>>>>>> --
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>>> current/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>>> group/akka-user
>>>>>>> ---
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to [email protected].
>>>>>>> To post to this group, send email to [email protected].
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> William Le Ferrand
>>>>>>
>>>>>> Mobile : (+1) (415) 683-1484
>>>>>> Web : http://williamleferrand.github.com/
>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>
>>>>>> --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>>> current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>>> group/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected].
>>>>>> To post to this group, send email to [email protected].
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>  --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>>> current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>>> group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> √
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> William Le Ferrand
>>>
>>> Mobile : (+1) (415) 683-1484
>>> Web : http://williamleferrand.github.com/
>>> <http://www.linkedin.com/in/williamleferrand>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
William Le Ferrand

Mobile : (+1) (415) 683-1484
Web : http://williamleferrand.github.com/
<http://www.linkedin.com/in/williamleferrand>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to