Hi Jeremy, did you come across that TIckSource can be cancelled <https://github.com/akka/akka/blob/4e78b368c39c47e5b2c913311d74fa13647c9448/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala#L100-L118> ?
On Fri, Dec 19, 2014 at 5:24 PM, Jeremy Stone < [email protected]> wrote: > > We are planning to use Akka Streams for the next version of our network > monitoring application. Basically the application connects to a number of > configured network devices (through a variety of protocols). For each > device we typically periodically write data to it (e.g. triggered by a > TickSource) and receive responses as well as other unsolicited data which > we scan and process in a number of ways. Essentially we have an inbound and > an outbound flow for each device. > > The application needs to be able to dynamically stop monitoring devices > (e.g. based on time of day or changes to configuration). This means that we > need to be able to tear down flows, thereby closing TCP connections and > freeing other resources. > > However it is not clear how we should do this in the general case... > > I can see how it might be done for custom sources and sinks using > ActorPublisher and ActorSubscriber, but I was hoping that there would be a > way to insert some kind of cancellable 'circuit breaker' into the flow that > would do this and allow use of the 'standard' Source and Sink > implementations (much like takeWithin but controlled externally rather than > being time based). > > I did try to create such a circuit breaker by zipping a source with an > ActorPublisher source that emits an infinite series of elements (see > below), but the resulting zipped source does not complete when cancelled > until (it seems) the main source emits another element. (Surprised the zip > does not complete when one of its input branches does.) > > Any advice/pointers would be greatly appreciated. > > Thanks, > > Jeremy Stone > > object SourceUtils { > def cancellable[T](source: Source[T])(implicit system: ActorSystem): > (Source[T], Cancellable) = { > > val (inf, cancellable) = cancellableInfiniteSource > > (source.zipWith(inf).map(_._1), cancellable) > } > > private def cancellableInfiniteSource(implicit system: ActorSystem): > (Source[Unit], Cancellable) = { > > case object Stop > > class CancellableSource extends ActorPublisher[Unit] with ActorLogging > { > var stopped = false > > def receive = LoggingReceive { > case _: Request => sendAll > case Stop => > sendAll > onComplete() > } > > def sendAll = for (_ <- 1L to totalDemand) onNext(()) > } > > val ref = system.actorOf(Props(new CancellableSource)) > > val cancellableSource = Source(ActorPublisher[Unit](ref)) > > (cancellableSource, new Cancellable { > val cancelled = new AtomicBoolean(false) > def cancel = { > if (!cancelled.getAndSet(true)) { > ref ! Stop > true > } else false > } > def isCancelled = cancelled.get > }) > } > > implicit class MoreSourceOps[T](source: Source[T]) { > def zipWith[U](other: Source[U]): (Source[(T, U)]) = { > > Source() { implicit b: FlowGraphBuilder => > import FlowGraphImplicits._ > > val out = UndefinedSink[(T, U)] > val zip = Zip[T, U] > > source ~> zip.left > other ~> zip.right > > zip.out ~> out > out > } > } > } > } > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Martynas Mickevičius Typesafe <http://typesafe.com/> – Reactive <http://www.reactivemanifesto.org/> Apps on the JVM -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
