Hi all, I have a very small stream that takes data from a GraphStage and
pushes it to a file:
val (killSwitch, flow) = dataSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(FileIO.toPath(Paths.get("/home/brian/test")))(Keep.both)
.run()
The dataSource is a GraphStage that takes data from a native method that
talks to hardware. There's a fairly large amount of data that comes in
(~1MB/s). The template for the GraphStage is
https://github.com/akka/akka/blob/bd8fcc9d9aeafc2c49745e7519d13b3e0abc18e6/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala#L48-L68
with an onPull() that looks like this:
override def onPull(): Unit = {
if (isAvailable(out)) {
val pulled: ByteString = nativePull()
if (pulled != null) {
push(out, pulled)
}
}
}
In some cases, nativePull() does not have any data from the device. In that
case, the code does not do a push().
I've seen that the GraphStage only gets a single onPull() -- if it does not
return data, the stream stalls.
What is the correct means to restart the calls to onPull() once it's
determined that a call to nativePull() would be successful? Maybe it's as
simple as the call to nativePull() should block? A Source that has no data
definitely qualifies the entire stream to not idly spin with no data to
work on...
Thanks! Brian
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.