Hi Rafal

On Fri, Apr 24, 2015 at 2:21 PM, Rafał Krzewski <rafal.krzew...@gmail.com>
wrote:

> I think I've moved one step closer: I think I know how to weld a flow
> breaker into my graph:
>
>   def watch(key: String, waitIndex: Option[Int] = None, recursive:
> Option[Boolean] = None, quorum: Option[Boolean] = None):
> Source[EtcdResponse, Cancellable] = {
>     case class WatchRequest(key: String, waitIndex: Option[Int],
> recursive: Option[Boolean], quorum: Option[Boolean])
>     val init = WatchRequest(key, waitIndex, recursive, quorum)
>     val breaker: Graph[FlowShape[WatchRequest, WatchRequest],
> Cancellable] = ???
>
>     Source[EtcdResponse, Cancellable](breaker) { implicit b =>
>       import FlowGraph.Implicits._
>
>       val initReq = b.add(Source.single(init))
>       val reqMerge = b.add(Merge[WatchRequest](2))
>       val runWait = b.add(Flow[WatchRequest].mapAsync { req =>
>         this.wait(req.key, req.waitIndex, req.recursive, req.quorum).map {
> resp =>
>           (req.copy(waitIndex = Some(resp.node.modifiedIndex + 1)), resp)
>         }
>       })
>       val respUnzip = b.add(Unzip[WatchRequest, EtcdResponse]())
>
>       initReq ~> reqMerge.in(0)
>       reqMerge ~> runWait
>       runWait ~> respUnzip.in
>
>       breaker => {
>         respUnzip.out0 ~> breaker.inlet
>         breaker.outlet ~> reqMerge.in(1)
>         respUnzip.out1
>       }
>     }
>   }
>
> The question now remains, how do I fabricate a Graph[FlowShape[T, T],
> Cancellable] that will generate an instance of Cancellable on each
> materialization connected to a PushPullStage, in such way that cancel()
> would "blow the fuse" and terminate the stream?
>

This is currently not possible, as we had not yet exposed the necessary API
to users yet -- but the funcionality is there internally, we just want to
gather a bit of experience before opening it up.

-Endre


>
> Cheers,
> Rafał
>
> W dniu piątek, 24 kwietnia 2015 11:53:31 UTC+2 użytkownik Rafał Krzewski
> napisał:
>>
>> Hi,
>>
>> I've decided to dive into the streams ;) and implemented a client for
>> etcd[1] using akka http client. It worked really well, and was a lot of fun!
>>
>> However, I'm missing one final piece that I wasn't able to figure out:
>> The client offers a watch function that returns a stream of EtcdResponses
>> [2]
>> Right now the actual returned type is Source[EtcdResponse, Unit], but
>> actually I'd like the materialized value to be akka.actor.Cancellable
>> that would allow the client to shut down the updates stream and release
>> it's resources. I think a custom PushPullStage exposing Cancellable
>> interface, inserted into the flow's feedback loop could do the job of
>> shutting down the stream, but I couldn't find a way to expose the
>> materialized value
>> from the FlowGraph construction block. I was looking at TCP streams and
>> TickSource that do return interesting materialized values, but they use
>> low level private [stream] APIs so I couldn't adapt any of that to my
>> high-level client code. Hints will be appreciated :)
>>
>> Cheers,
>> Rafał
>>
>> [1] https://github.com/coreos/etcd
>> [2]
>> https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/etcd/EtcdClient.scala#L57
>>
>>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to