I tried to follow the documentation as close as possible.
(http://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-integrations.html)
I have an application that I want to wake up every 30 seconds and read a
document from the web. It'll parse that document into my own objects and
create a stream of those.
I am using akka's scheduler to wake up every 30 seconds, and send a message
to an actor.
This actor is an ActorPublisher, so I can convert it to a stream.
It all ALMOST works. The actor wakes up, but totalDemand is always zero, so
I never get to call onNext.
What do I need to do to get this to run?
Is there a different way of reading a document every 30 seconds that uses
only streams and no actors?
Here's my code (modified to be a bit simpler, but same problem)
object Yoyo extends App {
implicit val actorSystem = ActorSystem("npm-stream-reader")
implicit val materializer = ActorFlowMaterializer()
import actorSystem.dispatcher
val log = Logging(actorSystem, getClass)
object Reader {
def props = Props[Reader]
case object ReadNow
case object Accepted
case object Denied
}
class Reader extends ActorPublisher[String] {
import akka.stream.actor.ActorPublisherMessage._
import Reader._
var buf = Vector.empty[String]
val MaxBufferSize = 100
def receive = {
case ReadNow if buf.size == MaxBufferSize ⇒
sender() ! Denied
case ReadNow ⇒
sender() ! Accepted
log.debug("Reader awakened, trying to read now")
//Read some stuff from the web
val stream = Source(1 to 5).map(a ⇒ a.toString)
val sink = Sink.foreach[String] {
str ⇒
log.debug(s"The total demand is ${totalDemand}")
log.debug(s"The buffer has ${buf.size} entries")
log.debug(str)
if (buf.isEmpty && totalDemand > 0) {
onNext(str)
} else {
buf :+= str
deliverBuf()
}
}
stream.runWith(sink)
case Request(n) ⇒
log.debug(s"${n} Requested")
deliverBuf()
case Cancel ⇒
log.debug(s"Cancel Requested")
context.stop(self)
case a: Any ⇒
log.debug(s"Something I know nothing about ${a}")
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
val reader = actorSystem.actorOf(Reader.props)
val source = Source.actorPublisher[String](Reader.props)
val ref = Flow[String]
.map { str ⇒
{
//////////////////////////////////
// THIS NEVER GETS CALLED!!!!!
log.debug(str)
}
str
}
.to(Sink.ignore)
.runWith(source)
actorSystem.scheduler.schedule(1 second, 30 seconds, reader,
Reader.ReadNow)
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.