On Wednesday, November 16, 2016 at 11:04:55 AM UTC, √ wrote:
>
> How would auxiliary stages know which index the accumulation started at?
>

That's where I was hoping an appropriate buffer stage could help, by just 
keeping the one that corresponds to the initial packet after the 
accumulator has emitted. But if not ...

The actual use case is for receiving chunked packets over a TCP stream so 
that I could keep track of when the first chunk arrived. I was hoping to 
use the built in Framing.lengthField flow, but will just implement it 
myself, keeping track of the timestamp. As an aside, is there any general 
value in extending that framing flow to allow storing additional state? For 
instance, in addition to tracking the timestamp, I am also extracting the 
original ip address using proxy protocol.
 

>
> On Wed, Nov 16, 2016 at 12:02 PM, Julian Howarth <[email protected] 
> <javascript:>> wrote:
>
>> Thanks Viktor,
>>
>> But as I said, I don't have access to change the accumulator flow - it's 
>> a black box that I otherwise would have to reimplement myself. That's why 
>> I'm trying to work around it with the Graph DSL.
>>
>> Julian
>>
>>
>> On Wednesday, November 16, 2016 at 10:05:27 AM UTC, √ wrote:
>>>
>>> val accumulator =
>>>   Flow[Int]
>>>     .statefulMapConcat { () ⇒
>>>       var total = 0
>>>       var curIdx = 0L
>>>       var startIdx = curIdx
>>>       i ⇒ {
>>>         val newTotal = total + i
>>>         if (newTotal >= 10) {
>>>           val result = List((newTotal, startIdx))
>>>           curIdx += 1
>>>           startIdx = curIdx
>>>           total = 0
>>>           result
>>>         } else {
>>>           val result = List.empty[(Int, Long)]
>>>           curIdx += 1
>>>           total = newTotal
>>>           result
>>>         }
>>>       }
>>>     }
>>>
>>>
>>> On Wed, Nov 16, 2016 at 10:18 AM, Julian Howarth <[email protected]> 
>>> wrote:
>>>
>>>> I have what seems a very simple problem but cannot come up with a 
>>>> solution for it.
>>>> Firstly, I have a flow that does some accumulation, therefore only 
>>>> emits after it has received a number of elements. As a simple example:
>>>>
>>>> val accumulator =
>>>>   Flow[Int]
>>>>     .statefulMapConcat { () ⇒
>>>>       var total = 0
>>>>       i ⇒ {
>>>>         val newTotal = total + i
>>>>         if (newTotal >= 10) {
>>>>           total = 0
>>>>           List(newTotal)
>>>>         }
>>>>         else {
>>>>           total = newTotal
>>>>           List()
>>>>         }
>>>>       }
>>>>     }
>>>>
>>>>
>>>> This works as expected:
>>>>
>>>> val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4))
>>>>   .via(accumulator)
>>>>   .runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
>>>>
>>>> result.futureValue shouldBe Seq(13, 11, 13, 11)
>>>>
>>>>
>>>> Now, I want to record the index of the first element which started the 
>>>> accumulation. In my actual application this is a java.time.Instant, but 
>>>> Long suffices for this example. As a first naive attempt, using the Graph 
>>>> DSL:
>>>>
>>>> val flow =
>>>>   Flow[(Int, Long)]
>>>>     .via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
>>>>       import GraphDSL.Implicits._
>>>>       val unzip = builder.add(Unzip[Int, Long]())
>>>>       val acc = builder.add(accumulator)
>>>>       val zip = builder.add(Zip[Int, Long]())
>>>>
>>>>       unzip.out0 ~> acc ~> zip.in0
>>>>       unzip.out1 ~>        zip.in1
>>>>
>>>>       FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
>>>>     }))
>>>>
>>>> val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4)).zipWithIndex
>>>>   .via(flow)
>>>>   .runWith(Sink.fold(Seq.empty[(Int, Long)])(_ :+ _))
>>>>
>>>> result.futureValue shouldBe Seq((13, 0), (11, 3), (13, 5), (11, 8))
>>>>
>>>>
>>>> which times out as no elements flow through. So adding a buffer:
>>>>
>>>> val flow =
>>>>   Flow[(Int, Long)]
>>>>     .via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
>>>>       import GraphDSL.Implicits._
>>>>       val unzip = builder.add(Unzip[Int, Long]())
>>>>       val acc = builder.add(accumulator)
>>>>       val zip = builder.add(Zip[Int, Long]())
>>>>
>>>>       val buffer = builder.add(Flow[Long].buffer(1, 
>>>> OverflowStrategy.dropTail))
>>>>
>>>>       unzip.out0 ~> acc    ~> zip.in0
>>>>       unzip.out1 ~> buffer ~> zip.in1
>>>>
>>>>       FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
>>>>     }))
>>>>
>>>>
>>>> and I get output, but not what I need:
>>>>
>>>> List((13,0), (11,2), (13,4), (11,7)) was not equal to List((13,0), 
>>>> (11,3), (13,5), (11,8))
>>>>
>>>> I think I know what is happening - the first Long element is buffered 
>>>> in the zip stage and so the element in the buffer is not actually the one 
>>>> I 
>>>> expect. But I don't know how to get around it. Using conflate ends up with 
>>>> exactly the same result
>>>>
>>>> Note that I can't easily rewrite the accumulator stage, if I could I 
>>>> would just process the pairs together in a stateful stage. 
>>>>
>>>> Thanks,
>>>>
>>>> Julian
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to