I am trying to write a flow that accumulates a collection of data (Int in
this example) and emits a List[Int] when either 1) the collection reaches a
certain size or 2) a timer tick is received.
I've implemented the code below. I have a println in the process() method
to know what's going on.
When I run this I get output once (false false true) then never
again...like the stream is stuck. My expectation would be that the
dataReady flag would soon be set as data is pushed from the input source,
but that's not happening.
What am I missing? Why is my stream not streaming?
Thanks for any help!
Greg
case class Accumulator(coll:scala.collection.mutable.ListBuffer[Int])
extends GraphStage[FanInShape2[Boolean,Int,List[Int]]] {
val out:Outlet[List[Int]] = Outlet("Stuff")
val tick:Inlet[Boolean] = Inlet("tick")
val data:Inlet[Int] = Inlet("data")
val CHUNK_SIZE = 5
private var dataReady = false
private var tickReady = false
override val shape:FanInShape2[Boolean,Int,List[Int]] = new FanInShape2(
tick,data,out)
override def createLogic(inheritedAttributes: Attributes):
GraphStageLogic = new GraphStageLogic(shape) {
def process() {
println(s"$tickReady $dataReady ${isAvailable(out)}") // prints false
false true the one time it outputs anything
if( coll.size < CHUNK_SIZE && dataReady ) {
coll += grab(data)
dataReady = false
}
if(isAvailable(out) && coll.size > 0 && (tickReady || coll.size ==
CHUNK_SIZE)) {
tickReady = false
push(out,coll.to[List])
coll.clear
if(dataReady)
process() // in case data is ready but collection was full
}
}
setHandler(out, new OutHandler{
override def onPull():Unit = {
process()
}
})
setHandler(data, new InHandler{
override def onPush():Unit = {
dataReady = true
process()
}
})
setHandler(tick, new InHandler{
override def onPush():Unit = {
tickReady = true
grab(tick) // clear tick's payload...don't care about it otherwise
process()
}
})
}
}
case class Stuff() {
val rateLimit = Flow[Int].map(i => { Thread.sleep(100); i })
val collection = scala.collection.mutable.ListBuffer.empty[Int]
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder:
FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val numSource = Source(1 to 100)
val tickSource = Source.tick[Boolean](1 second, 1 second, true)
val toss = Sink.ignore
val mix = builder.add(Accumulator(collection))
val show = Flow[List[Int]].map(i => {println(i);i})
tickSource ~> mix.in0
numSource ~> rateLimit ~> mix.in1
mix.out ~> show ~> toss
ClosedShape
})
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.