I think I've moved one step closer: I think I know how to weld a flow
breaker into my graph:
def watch(key: String, waitIndex: Option[Int] = None, recursive:
Option[Boolean] = None, quorum: Option[Boolean] = None):
Source[EtcdResponse, Cancellable] = {
case class WatchRequest(key: String, waitIndex: Option[Int], recursive:
Option[Boolean], quorum: Option[Boolean])
val init = WatchRequest(key, waitIndex, recursive, quorum)
val breaker: Graph[FlowShape[WatchRequest, WatchRequest], Cancellable]
= ???
Source[EtcdResponse, Cancellable](breaker) { implicit b =>
import FlowGraph.Implicits._
val initReq = b.add(Source.single(init))
val reqMerge = b.add(Merge[WatchRequest](2))
val runWait = b.add(Flow[WatchRequest].mapAsync { req =>
this.wait(req.key, req.waitIndex, req.recursive, req.quorum).map {
resp =>
(req.copy(waitIndex = Some(resp.node.modifiedIndex + 1)), resp)
}
})
val respUnzip = b.add(Unzip[WatchRequest, EtcdResponse]())
initReq ~> reqMerge.in(0)
reqMerge ~> runWait
runWait ~> respUnzip.in
breaker => {
respUnzip.out0 ~> breaker.inlet
breaker.outlet ~> reqMerge.in(1)
respUnzip.out1
}
}
}
The question now remains, how do I fabricate a Graph[FlowShape[T, T],
Cancellable] that will generate an instance of Cancellable on each
materialization connected to a PushPullStage, in such way that cancel()
would "blow the fuse" and terminate the stream?
Cheers,
Rafał
W dniu piątek, 24 kwietnia 2015 11:53:31 UTC+2 użytkownik Rafał Krzewski
napisał:
>
> Hi,
>
> I've decided to dive into the streams ;) and implemented a client for
> etcd[1] using akka http client. It worked really well, and was a lot of fun!
>
> However, I'm missing one final piece that I wasn't able to figure out: The
> client offers a watch function that returns a stream of EtcdResponses [2]
> Right now the actual returned type is Source[EtcdResponse, Unit], but
> actually I'd like the materialized value to be akka.actor.Cancellable
> that would allow the client to shut down the updates stream and release
> it's resources. I think a custom PushPullStage exposing Cancellable
> interface, inserted into the flow's feedback loop could do the job of
> shutting down the stream, but I couldn't find a way to expose the
> materialized value
> from the FlowGraph construction block. I was looking at TCP streams and
> TickSource that do return interesting materialized values, but they use
> low level private [stream] APIs so I couldn't adapt any of that to my
> high-level client code. Hints will be appreciated :)
>
> Cheers,
> Rafał
>
> [1] https://github.com/coreos/etcd
> [2]
> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClient.scala#L57
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.