Hi all, I have a very small stream that takes data from a GraphStage and 
pushes it to a file: 

val (killSwitch, flow) = dataSource
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(FileIO.toPath(Paths.get("/home/brian/test")))(Keep.both)
  .run()


The dataSource is a GraphStage that takes data from a native method that 
talks to hardware. There's a fairly large amount of data that comes in 
(~1MB/s). The template for the GraphStage is 
https://github.com/akka/akka/blob/bd8fcc9d9aeafc2c49745e7519d13b3e0abc18e6/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala#L48-L68
 
with an onPull() that looks like this:

override def onPull(): Unit = {
  if (isAvailable(out)) {
    val pulled: ByteString = nativePull()
    if (pulled != null) {
      push(out, pulled)
    }
  }
}


In some cases, nativePull() does not have any data from the device. In that 
case, the code does not do a push(). 

I've seen that the GraphStage only gets a single onPull() -- if it does not 
return data, the stream stalls. 

What is the correct means to restart the calls to onPull() once it's 
determined that a call to nativePull() would be successful? Maybe it's as 
simple as the call to nativePull() should block? A Source that has no data 
definitely qualifies the entire stream to not idly spin with no data to 
work on...

Thanks! Brian

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to