Awesome thanks a lot. On Sunday, March 6, 2016 at 7:20:43 AM UTC-6, Rafał Krzewski wrote: > > Arun, > > a little correction: > > val runnableGraph = > source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both) > > And subsequently: > > val (counter, futureSum) = runnableGraph.run() > > Graph outlets are always streams. You need to connect them to a Sink > (through intervening Flows or more complex Graphs, as necessary) in order > to create a RunnableGraph. Materialized values are the other things used > to connect the RunnableGraph to the outside world that are *not* streams. > > For example Sink.fold creates a stream element that is (obviously) a Sink. > It does not have any stream outlets. However it provides a materialized > value Future[U] that is completed when the Sink's inlet stream is > exhausted. This is how a running stream can communicate it's successful > completion or failure to the outside world. > > Another example is Source.actorPublisher: you provide it with Props for > an Actor that implements ActorPublisher contract. When materializing the > stream, the Source will instantiate the Actor and return it's ActorRef as > a materialized value. The Actor is internal to the stream but you can use > the ActorRef as an interface from the outside world into the stream: send > messages (using your own protocol) to be passed to the Source's outlet, > according to demand from downstream. The tricky part is that such gateway > Actor must manage buffering and/or backpressure on it's own! > > Besides that, you can use materialized values to monitor stream execution > from the outside, like in the Counter example above or > https://github.com/akka/akka/pull/19836 or to interrupt a stream that > would otherwise run for a long (or unlimited) time: > https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala > > Cheers, > Rafał > > W dniu niedziela, 6 marca 2016 08:43:10 UTC+1 użytkownik Arun Sethia > napisał: >> >> Thanks Rafal. >> >> Based on this I tried to make sample code, where I would like to count >> number of elements being processed and their sum: >> >> val source = Source (1 to 5).filter(x=> x%2==0) >> >> val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _) >> >> val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both) >> >> val result=runnableGraph.run() >> >> >> def counter[T]: Flow[T, T, Counter] = { >> val internalCounter = new AtomicLong(0) >> Flow[T].map{ elem ⇒ >> internalCounter.incrementAndGet() >> elem >> }.mapMaterializedValue(_ ⇒ new Counter{ >> override def get = internalCounter.get >> }) >> } >> >> >> >> 1. using Keep.both, result should able to return me count and sum, but it is >> not? >> >> 2. How materialize values are different than "out"? I am not able to >> visualize the difference between materialize values and out? >> >> Thanks >> Arun >> >> >> >> On Saturday, March 5, 2016 at 6:02:56 PM UTC-6, Arun Sethia wrote: >>> >>> Hi, >>> >>> can some explain what does it mean of materialized value ? I have see >>> documentation at >>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams >>> >>> >>> I am not sure how Flow can define materialize type, for example the >>> following code has Input - Tweet, output - Int but Mat is Unit. I would >>> like to see how someone can define Mat as Int or any example where Flow or >>> source is defining Mat other than Unit. >>> >>> - val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) >>> >>> >>> >>> It is quite confusing for me to understand difference between "out" and >>> "Mat". >>> >>> >>> Thanks >>> >>> As >>> >>>
-- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
