Ah!  I get it.  (Any sleeps are just for testing to exaggerate timing 
behavior so I can see how things will work.... no sleep in the final. ;-) )

For anyone reading this thread in the future, here's what finally worked 
for me:

case class Mixer(coll:scala.collection.mutable.ListBuffer[Int]) extends 
GraphStage[FlowShape[Int,List[Int]]] {

  val out:Outlet[List[Int]] = Outlet("Stuff")

  val data:Inlet[Int] = Inlet("data")

  val CHUNK_SIZE = 4


  private var dataReady = false

  private var tickReady = false


  override val shape:FlowShape[Int,List[Int]] = new FlowShape(data,out)


  override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new TimerGraphStageLogic(shape) {

    override def preStart(): Unit = {

      scheduleOnce(true, 3 seconds)

      pull(data)

    }

    

    def process() = {

        if((coll.size == CHUNK_SIZE && transmit) || coll.size < CHUNK_SIZE) 
{

          if(isAvailable(data)) {

            val a = grab(data)

            coll += a

            pull(data)

            println("Data: "+a )

          }

        }

    }

    

    setHandler(out, new OutHandler {  

        override def onPull():Unit = process()

      }

    )


    setHandler(data, new InHandler{

      override def onPush():Unit = {

        process()

      }

      override def onUpstreamFinish():Unit = {}  // necessary for some 
reason!

    })

    

    def transmit = {  

      println("--xmit--")

      scheduleOnce(true, 3 seconds)

      if(coll.size > 0 && isAvailable(out)) {

        push(out,coll.to[List])

        coll.clear

        true 

      } else 

        false

    }

    

    override def onTimer(key:Any):Unit = {

      println("Time!")

      transmit

    }


  }

}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to