Since we have `unfold` and `unfoldAsync` for Source, I'd say it'd be more than symmetric to have `fold` and `foldAsync` on Sink :)
On Sun, Mar 6, 2016 at 12:28 AM, Giovanni Alberto Caporaletti < [email protected]> wrote: > Hi Roland, > you're right, my solution was a bit naive. I came up with this, I'm pretty > sure it can be done in a better way, looking forward to seeing your > solution :) > > def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = > Sink.fromGraph { > val sink = Sink.fold(zero)(Keep.right[U, U]) > > GraphDSL.create(sink) { implicit b => sink => > import GraphDSL.Implicits._ > val zip = b.add(ZipWith(f)) > val bcast = b.add(Broadcast[U](2)) > val merge = b.add(Merge[U](2)) > val z = Source.single(zero) > > z ~> merge > merge ~> zip.in0 > zip.out.mapAsync(1)(identity) ~> bcast ~> sink > merge <~ bcast > > SinkShape(zip.in1) > } > } > > > On Saturday, 5 March 2016 21:52:06 UTC+1, rkuhn wrote: >> >> Unfortunately these solutions create unbounded amounts of futures without >> back pressure, so I'd recommend against this approach. But it is late and >> I'm on the phone so cannot suggest a proper solution. >> >> Regards, >> >> Roland >> >> Sent from my iPhone >> >> On 05 Mar 2016, at 17:41, Giovanni Alberto Caporaletti <[email protected]> >> wrote: >> >> how about this: >> >> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = { >> Sink >> .fold[Future[U], T](Future.successful(zero))((fu, t) => fu.flatMap(f(_, >> t))) >> .mapMaterializedValue(_ flatMap identity) >> } >> >> or this: >> >> def foldM[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = { >> Flow[T] >> .fold(Future.successful(zero))((fu, t) => fu.flatMap(f(_, t))) >> .mapAsync(1)(identity) >> .toMat(Sink.head)(Keep.right) >> } >> >> >> >> On Saturday, 5 March 2016 17:00:08 UTC+1, Andrew Gaydenko wrote: >>> >>> Hi! There is >>> >>> f: (U, T) ⇒ Future[U] >>> >>> rather than >>> >>> f: (U, T) ⇒ U >>> >>> in hands. How to create >>> >>> Sink[T, Future[U]] >>> >>> ? >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> >> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Cheers, √ -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
