We are planning to use Akka Streams for the next version of our network
monitoring application. Basically the application connects to a number of
configured network devices (through a variety of protocols). For each
device we typically periodically write data to it (e.g. triggered by a
TickSource) and receive responses as well as other unsolicited data which
we scan and process in a number of ways. Essentially we have an inbound and
an outbound flow for each device.
The application needs to be able to dynamically stop monitoring devices
(e.g. based on time of day or changes to configuration). This means that we
need to be able to tear down flows, thereby closing TCP connections and
freeing other resources.
However it is not clear how we should do this in the general case...
I can see how it might be done for custom sources and sinks using
ActorPublisher and ActorSubscriber, but I was hoping that there would be a
way to insert some kind of cancellable 'circuit breaker' into the flow that
would do this and allow use of the 'standard' Source and Sink
implementations (much like takeWithin but controlled externally rather than
being time based).
I did try to create such a circuit breaker by zipping a source with an
ActorPublisher source that emits an infinite series of elements (see
below), but the resulting zipped source does not complete when cancelled
until (it seems) the main source emits another element. (Surprised the zip
does not complete when one of its input branches does.)
Any advice/pointers would be greatly appreciated.
Thanks,
Jeremy Stone
object SourceUtils {
def cancellable[T](source: Source[T])(implicit system: ActorSystem):
(Source[T], Cancellable) = {
val (inf, cancellable) = cancellableInfiniteSource
(source.zipWith(inf).map(_._1), cancellable)
}
private def cancellableInfiniteSource(implicit system: ActorSystem):
(Source[Unit], Cancellable) = {
case object Stop
class CancellableSource extends ActorPublisher[Unit] with ActorLogging {
var stopped = false
def receive = LoggingReceive {
case _: Request => sendAll
case Stop =>
sendAll
onComplete()
}
def sendAll = for (_ <- 1L to totalDemand) onNext(())
}
val ref = system.actorOf(Props(new CancellableSource))
val cancellableSource = Source(ActorPublisher[Unit](ref))
(cancellableSource, new Cancellable {
val cancelled = new AtomicBoolean(false)
def cancel = {
if (!cancelled.getAndSet(true)) {
ref ! Stop
true
} else false
}
def isCancelled = cancelled.get
})
}
implicit class MoreSourceOps[T](source: Source[T]) {
def zipWith[U](other: Source[U]): (Source[(T, U)]) = {
Source() { implicit b: FlowGraphBuilder =>
import FlowGraphImplicits._
val out = UndefinedSink[(T, U)]
val zip = Zip[T, U]
source ~> zip.left
other ~> zip.right
zip.out ~> out
out
}
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.