I tried to follow the documentation as close as possible. 
(http://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-integrations.html)
I have an application that I want to wake up every 30 seconds and read a 
document from the web. It'll parse that document into my own objects and 
create a stream of those.
I am using akka's scheduler to wake up every 30 seconds, and send a message 
to an actor. 
This actor is an ActorPublisher, so I can convert it to a stream.
It all ALMOST works. The actor wakes up, but totalDemand is always zero, so 
I never get to call onNext. 
What do I need to do to get this to run?
Is there a different way of reading a document every 30 seconds that uses 
only streams and no actors?

Here's my code (modified to be a bit simpler, but same problem)

object Yoyo extends App {
  implicit val actorSystem = ActorSystem("npm-stream-reader")
  implicit val materializer = ActorFlowMaterializer()
  import actorSystem.dispatcher
  val log = Logging(actorSystem, getClass)
  object Reader {
    def props = Props[Reader]
    case object ReadNow
    case object Accepted
    case object Denied
  }
  class Reader extends ActorPublisher[String] {
    import akka.stream.actor.ActorPublisherMessage._
    import Reader._
    var buf = Vector.empty[String]
    val MaxBufferSize = 100

    def receive = {
      case ReadNow if buf.size == MaxBufferSize ⇒
        sender() ! Denied
      case ReadNow ⇒
        sender() ! Accepted
        log.debug("Reader awakened, trying to read now")
        //Read some stuff from the web
        val stream = Source(1 to 5).map(a ⇒ a.toString)

        val sink = Sink.foreach[String] {
          str ⇒
            log.debug(s"The total demand is ${totalDemand}")
            log.debug(s"The buffer has ${buf.size} entries")
            log.debug(str)
            if (buf.isEmpty && totalDemand > 0) {
              onNext(str)
            } else {
              buf :+= str
              deliverBuf()
            }
        }
        stream.runWith(sink)
      case Request(n) ⇒
        log.debug(s"${n} Requested")
        deliverBuf()
      case Cancel ⇒
        log.debug(s"Cancel Requested")
        context.stop(self)
      case a: Any ⇒
        log.debug(s"Something I know nothing about ${a}")
    }
    @tailrec final def deliverBuf(): Unit =
      if (totalDemand > 0) {
        if (totalDemand <= Int.MaxValue) {
          val (use, keep) = buf.splitAt(totalDemand.toInt)
          buf = keep
          use foreach onNext
        } else {
          val (use, keep) = buf.splitAt(Int.MaxValue)
          buf = keep
          use foreach onNext
          deliverBuf()
        }
      }
  }
  val reader = actorSystem.actorOf(Reader.props)
  val source = Source.actorPublisher[String](Reader.props)

  val ref = Flow[String]
    .map { str ⇒
      {
        //////////////////////////////////
        // THIS NEVER GETS CALLED!!!!!
        log.debug(str)
      }
      str
    }
    .to(Sink.ignore)
    .runWith(source)

  actorSystem.scheduler.schedule(1 second, 30 seconds, reader, 
Reader.ReadNow)
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to