I have what seems a very simple problem but cannot come up with a solution 
for it.
Firstly, I have a flow that does some accumulation, therefore only emits 
after it has received a number of elements. As a simple example:

val accumulator =
  Flow[Int]
    .statefulMapConcat { () ⇒
      var total = 0
      i ⇒ {
        val newTotal = total + i
        if (newTotal >= 10) {
          total = 0
          List(newTotal)
        }
        else {
          total = newTotal
          List()
        }
      }
    }


This works as expected:

val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4))
  .via(accumulator)
  .runWith(Sink.fold(Seq.empty[Int])(_ :+ _))

result.futureValue shouldBe Seq(13, 11, 13, 11)


Now, I want to record the index of the first element which started the 
accumulation. In my actual application this is a java.time.Instant, but 
Long suffices for this example. As a first naive attempt, using the Graph 
DSL:

val flow =
  Flow[(Int, Long)]
    .via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
      import GraphDSL.Implicits._
      val unzip = builder.add(Unzip[Int, Long]())
      val acc = builder.add(accumulator)
      val zip = builder.add(Zip[Int, Long]())

      unzip.out0 ~> acc ~> zip.in0
      unzip.out1 ~>        zip.in1

      FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
    }))

val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4)).zipWithIndex
  .via(flow)
  .runWith(Sink.fold(Seq.empty[(Int, Long)])(_ :+ _))

result.futureValue shouldBe Seq((13, 0), (11, 3), (13, 5), (11, 8))


which times out as no elements flow through. So adding a buffer:

val flow =
  Flow[(Int, Long)]
    .via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
      import GraphDSL.Implicits._
      val unzip = builder.add(Unzip[Int, Long]())
      val acc = builder.add(accumulator)
      val zip = builder.add(Zip[Int, Long]())

      val buffer = builder.add(Flow[Long].buffer(1, OverflowStrategy.dropTail))

      unzip.out0 ~> acc    ~> zip.in0
      unzip.out1 ~> buffer ~> zip.in1

      FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
    }))


and I get output, but not what I need:

List((13,0), (11,2), (13,4), (11,7)) was not equal to List((13,0), (11,3), 
(13,5), (11,8))

I think I know what is happening - the first Long element is buffered in 
the zip stage and so the element in the buffer is not actually the one I 
expect. But I don't know how to get around it. Using conflate ends up with 
exactly the same result

Note that I can't easily rewrite the accumulator stage, if I could I would 
just process the pairs together in a stateful stage. 

Thanks,

Julian

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to