Hello all,

I'm fairly new to Akka but I've been enjoying it so far.  I'm building a 
simple example service that fetches data from a database periodically and 
then uses an ActorPublisher to publish the data.  Everything works well the 
first going around but once the first result set is processed, the stream 
terminates, but I cannot tell why.  I am not calling onComplete from my 
publisher.  Also it appears that onComplete isn't called on the sink 
either.  Here are the relevant b
class ServiceDataSource extends ActorPublisher[Int] with ActorLogging {
    case object Tick

    val jobs = mutable.Queue.empty[Int]
    var lastJob = 0

    var tick: Option[Cancellable] = None
    val tickInterval = 3 seconds

    override def preStart() {
        implicit val executionContext = context.dispatcher
        tick = Some(context.system.scheduler.schedule(0 millis, 
tickInterval, self, Tick))
    }

      def receive = {
        case Tick => createFakeWork()
        case Request(demand) =>
            log.debug("{} Jobs demanded", demand)
            while (jobs.nonEmpty && isActive && totalDemand > 0) {
                onNext(jobs.dequeue())
            }
        case unknown => log.warning("got unknown message: {}", unknown)
    }

    def createFakeWork() {
        log.debug("Creating Jobs...")
        if(lastJob != 0) lastJob += 100
        jobs ++= List.range(lastJob, lastJob + 100)
    }
}


class ServiceScheduler extends Actor with ActorLogging {
      override def preStart() {
        implicit val materializer = ActorMaterializer()

        val source = Source.actorPublisher(Props[BackupJobSource])
        val sink = Sink.actorSubscriber(Props[BackupRouter])
        source.runWith(sink)
    }

    def receive = {
          case _ => log.error("placeholder catch all"); self ! PoisonPill
    }
}



Thanks in advance!  I know I'm missing something simple here.

Best,

Devin

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to