I can find this example for an older akka-streams but not one that works
with Akka-streams 1.0-M4
In particular, i am having trouble figuring out how to emit the buffered
element (reference) after both the streams have closed.
I have tried to go to this state after both streams have closed, but the
onInput method is never called. Not sure how to frame the read condition so
that it gets called when downstream wants an element
def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
(ctx, input, elem) ⇒
ctx.emit(ref)
ctx.finish()
SameState
}
Or maybe I am going about it the wrong way completely. Full Code below:
object StreamMerge extends App {
implicit val actorSystem = ActorSystem("LogFiles")
implicit val mat = ActorFlowMaterializer()
class OrderedMergeShape[A](_init: Init[A] = Name("OrderedMerge")) extends
FanInShape[A](_init) {
val input1 = newInlet[A]("input1")
val input2 = newInlet[A]("input2")
override protected def construct(init: Init[A]): FanInShape[A] = new
OrderedMergeShape(init)
}
class IntOrderMerge
extends FlexiMerge[Int, OrderedMergeShape[Int]](
new OrderedMergeShape(), OperationAttributes.name("OrderedMerge")) {
import FlexiMerge._
override def createMergeLogic(s: OrderedMergeShape[Int]): MergeLogic[Int] =
new MergeLogic[Int] {
var ref: Int = _
var refValid = false
def other(input: InPort): Inlet[Int] = if (input eq s.input1) s.input2
else s.input1
def getFirstElement = State[Int](ReadAny(s.input1, s.input2)) { (ctx,
input, elem) ⇒
ref = elem
refValid = true
ctx.changeCompletionHandling(emitOtherOnClose)
readUntilLarger(other(input))
}
override def initialState: State[Int] = getFirstElement
def readUntilLarger(curInput: Inlet[Int]): State[Int] =
State[Int](Read(curInput)) {
(ctx, input, elem) ⇒
if (elem <= ref) {
ctx.emit(elem)
SameState
}
else {
ctx.emit(ref)
ref = elem
readUntilLarger(other(input))
}
}
def readRemaining(curInput: Inlet[Int]) = State[Int](Read(curInput)) {
(ctx, input, elem) ⇒
if (elem <= ref)
ctx.emit(elem)
else {
ctx.emit(ref)
ref = elem
}
SameState
}
def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
(ctx, input, elem) ⇒
ctx.emit(ref)
ctx.finish()
SameState
}
val emitLast = CompletionHandling(
onUpstreamFinish = { (ctx, input) ⇒
ctx.changeCompletionHandling(defaultCompletionHandling)
produceRefAndClose
},
onUpstreamFailure = { (ctx, _, cause) ⇒
ctx.fail(cause)
SameState
}
)
val emitOtherOnClose = CompletionHandling(
onUpstreamFinish = { (ctx, input) ⇒
ctx.changeCompletionHandling(emitLast)
readRemaining(other(input))
},
onUpstreamFailure = { (ctx, _, cause) ⇒
ctx.fail(cause)
SameState
}
)
override def initialCompletionHandling: CompletionHandling =
emitOtherOnClose
}
}
def printIntStream(): Unit = {
val srcA = Source(List(1,2,2,3,7,8))
val srcB = Source(List(2,3,3,6,7))
val sink = Sink.foreach(println)
val fGraph = FlowGraph.closed() { implicit b ⇒
import FlowGraph.Implicits._
val intOrderMerge = b.add(new IntOrderMerge)
srcA ~> intOrderMerge.input1
srcB ~> intOrderMerge.input2
intOrderMerge.out ~> sink
}
println("Running flow")
fGraph.run()
}
printIntStream()
}
On Wednesday, 10 December 2014 09:51:11 UTC, drewhk wrote:
>
>
>
> On Tue, Dec 9, 2014 at 8:07 PM, William Le Ferrand <[email protected]
> <javascript:>> wrote:
>
>> Dear List,
>>
>> I'm trying to revisit this problem now that streams is in a stable state;
>> but I don't quite see how to use MergeLogic to perform this sort merge - is
>> it actually possible and does anyone would have hints toward a working
>> solution?
>>
>
> Yes, in the test for FlexiMerge there is a sorting merge. Docs will come
> soon and will contain a cookbook section with recipes like this.
>
> -Endre
>
>
>>
>> thanks in advance,
>>
>> best
>>
>> william
>>
>> On Wed, Sep 24, 2014 at 7:28 AM, √iktor Ҡlang <[email protected]
>> <javascript:>> wrote:
>>
>>> +1
>>>
>>> On Wed, Sep 24, 2014 at 5:07 AM, Endre Varga <[email protected]
>>> <javascript:>> wrote:
>>>
>>>> There will be all kinds of merges available in the future. Akka Streams
>>>> is still a preview so we change stuff all the time. To get to the point
>>>> where we can support pluggable merges we needed the graph API and support
>>>> for n-way fan-in operations first. So stay tuned :)
>>>>
>>>> -Endre
>>>>
>>>> On Wed, Sep 24, 2014 at 1:43 AM, William Le Ferrand <[email protected]
>>>> <javascript:>> wrote:
>>>>
>>>>> It looks like that:
>>>>> https://gist.github.com/williamleferrand/c77133af821d5eb278be (I feel
>>>>> bad looking at it)
>>>>>
>>>>> On Tue, Sep 23, 2014 at 4:38 PM, √iktor Ҡlang <[email protected]
>>>>> <javascript:>> wrote:
>>>>>
>>>>>> Now it is way more clear! :) So you have 2 sorted streams that you
>>>>>> want to merge to be one sorted stream.
>>>>>> Right now I don't think there's a good machinery for doing this, but
>>>>>> I may be misinformed.
>>>>>> What does the solution that you came up with look like?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 23, 2014 at 7:29 PM, William Le Ferrand <
>>>>>> [email protected] <javascript:>> wrote:
>>>>>>
>>>>>>> Sorry, the source stream are already sorted (that's what I meant
>>>>>>> earlier by "ordered data"), so you just have to compare the head of
>>>>>>> your
>>>>>>> two sources to decide which element you want to push in the outbound
>>>>>>> stream
>>>>>>> first. Does it make sense?
>>>>>>>
>>>>>>> On Tue, Sep 23, 2014 at 4:24 PM, √iktor Ҡlang <[email protected]
>>>>>>> <javascript:>> wrote:
>>>>>>>
>>>>>>>> I still don't understand, if we have 2 unbounded streams, how do we
>>>>>>>> sort them without consuming all of them first?
>>>>>>>>
>>>>>>>> On Tue, Sep 23, 2014 at 7:06 PM, William Le Ferrand <
>>>>>>>> [email protected] <javascript:>> wrote:
>>>>>>>>
>>>>>>>>> Victor,
>>>>>>>>>
>>>>>>>>> It would be an unbounded stream of ordered items (potentially as
>>>>>>>>> slow as the slowest source stream)
>>>>>>>>>
>>>>>>>>> On Tue, Sep 23, 2014 at 3:59 PM, √iktor Ҡlang <[email protected]
>>>>>>>>> <javascript:>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi William,
>>>>>>>>>>
>>>>>>>>>> I am not sure I understand the question, given N unbounded
>>>>>>>>>> streams of comparable items, what does the resulting stream look
>>>>>>>>>> like?
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 23, 2014 at 1:31 PM, William Le Ferrand <
>>>>>>>>>> [email protected] <javascript:>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Dear List,
>>>>>>>>>>>
>>>>>>>>>>> I've a set of reactive streams that contain ordered data. Is
>>>>>>>>>>> there an easy way to "sort merge" them into a single reactive
>>>>>>>>>>> stream? I
>>>>>>>>>>> couldn't find anything in the documentation nor on the web; I came
>>>>>>>>>>> up with
>>>>>>>>>>> a handmade solution but it looks unnecessary obfuscated
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> William
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> William Le Ferrand
>>>>>>>>>>>
>>>>>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>>>>>> https://groups.google.com/group/akka-user
>>>>>>>>>>> ---
>>>>>>>>>>> You received this message because you are subscribed to the
>>>>>>>>>>> Google Groups "Akka User List" group.
>>>>>>>>>>> To unsubscribe from this group and stop receiving emails from
>>>>>>>>>>> it, send an email to [email protected]
>>>>>>>>>>> <javascript:>.
>>>>>>>>>>> To post to this group, send email to [email protected]
>>>>>>>>>>> <javascript:>.
>>>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Cheers,
>>>>>>>>>> √
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>>>>> https://groups.google.com/group/akka-user
>>>>>>>>>> ---
>>>>>>>>>> You received this message because you are subscribed to the
>>>>>>>>>> Google Groups "Akka User List" group.
>>>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>>>> send an email to [email protected] <javascript:>.
>>>>>>>>>> To post to this group, send email to [email protected]
>>>>>>>>>> <javascript:>.
>>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> William Le Ferrand
>>>>>>>>>
>>>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>>>> https://groups.google.com/group/akka-user
>>>>>>>>> ---
>>>>>>>>> You received this message because you are subscribed to the Google
>>>>>>>>> Groups "Akka User List" group.
>>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>>> send an email to [email protected] <javascript:>.
>>>>>>>>> To post to this group, send email to [email protected]
>>>>>>>>> <javascript:>.
>>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cheers,
>>>>>>>> √
>>>>>>>>
>>>>>>>> --
>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>>> https://groups.google.com/group/akka-user
>>>>>>>> ---
>>>>>>>> You received this message because you are subscribed to the Google
>>>>>>>> Groups "Akka User List" group.
>>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>>> send an email to [email protected] <javascript:>.
>>>>>>>> To post to this group, send email to [email protected]
>>>>>>>> <javascript:>.
>>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> William Le Ferrand
>>>>>>>
>>>>>>> Mobile : (+1) (415) 683-1484
>>>>>>> Web : http://williamleferrand.github.com/
>>>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>>>
>>>>>>> --
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives:
>>>>>>> https://groups.google.com/group/akka-user
>>>>>>> ---
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to [email protected] <javascript:>.
>>>>>>> To post to this group, send email to [email protected]
>>>>>>> <javascript:>.
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> √
>>>>>>
>>>>>> --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ:
>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives:
>>>>>> https://groups.google.com/group/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected] <javascript:>.
>>>>>> To post to this group, send email to [email protected]
>>>>>> <javascript:>.
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> William Le Ferrand
>>>>>
>>>>> Mobile : (+1) (415) 683-1484
>>>>> Web : http://williamleferrand.github.com/
>>>>> <http://www.linkedin.com/in/williamleferrand>
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ:
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected] <javascript:>.
>>>>> To post to this group, send email to [email protected]
>>>>> <javascript:>.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected] <javascript:>.
>>>> To post to this group, send email to [email protected]
>>>> <javascript:>.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> √
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected] <javascript:>.
>>> To post to this group, send email to [email protected]
>>> <javascript:>.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> William Le Ferrand
>>
>> Mobile : (+1) (415) 683-1484
>> Web : http://williamleferrand.github.com/
>> <http://www.linkedin.com/in/williamleferrand>
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.