Hi, Iv'e implemented it the "right way" this time, and would like to submit a pull request with the code.
the code I came up with can be found in this gist (feedback would be great): https://gist.github.com/hochgi/927d1ceab88c55fbb0f9 So, how do I submit the PR? (which branch? where do I put the code? where are the tests so I can add tests for it?) Thanks, Gilad. On Friday, November 27, 2015 at 10:34:51 AM UTC+2, Akka Team wrote: > > Hi Barys, > > With the new GraphStage API it is possible. Wait for M2 for new > documentation (the GraphStage is already there in M1) > > -Endre > > On Fri, Nov 27, 2015 at 9:30 AM, Barys Ilyushonak <[email protected] > <javascript:>> wrote: > >> Gilad, thank you for pointing that out. >> >> the one different between Play.Enumerator unfoldM and Iterator - the >> state is Future in the first case. That allows determine the next element >> or final event eventually. >> Is it possible to implement similar behaviour via Source API? >> >> Cheers, >> Boris >> >> On Thursday, November 26, 2015 at 4:50:53 PM UTC+1, Gilad Hoch wrote: >>> >>> iterator + take(n) is not equivalent to unfold. >>> for instance, in my application, I'm using unfold to convert >>> Elasticsearch's scroll to a play Enumerator. >>> >>> thanks, >>> Gilad. >>> >>> On Thursday, November 26, 2015 at 5:41:51 PM UTC+2, √ wrote: >>>> >>>> use take(n) >>>> -- >>>> Cheers, >>>> √ >>>> On 26 Nov 2015 15:35, "Gilad Hoch" <[email protected]> wrote: >>>> >>>>> >>>>> >>>>> OK, >>>>> so how is it possible to generate a finite stream? >>>>> if I can't cut the iterator, what can I do? >>>>> >>>>> BTW, issue is opened: >>>>> https://github.com/akka/akka/issues/19021 >>>>> >>>>> Gilad. >>>>> >>>>> On Thursday, November 26, 2015 at 4:23:00 PM UTC+2, drewhk wrote: >>>>>> >>>>>> Hi Gilad, >>>>>> >>>>>> Exceptions from a stream stage will result in a failure of the >>>>>> overall stream. >>>>>> >>>>>> -Endre >>>>>> >>>>>> On Thu, Nov 26, 2015 at 3:20 PM, Gilad Hoch <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> thanks Endre, >>>>>>> >>>>>>> a simple example to show it does not work as expected, >>>>>>> generating a fibonacci stream for all fib numbers under 10M: >>>>>>> >>>>>>> scala> import akka.stream.scaladsl._ >>>>>>> import akka.stream.scaladsl._ >>>>>>> >>>>>>> scala> import akka.stream.ActorMaterializer >>>>>>> import akka.stream.ActorMaterializer >>>>>>> >>>>>>> scala> import akka.actor.ActorSystem >>>>>>> import akka.actor.ActorSystem >>>>>>> >>>>>>> scala> implicit val system = ActorSystem("akka-stream-experiments") >>>>>>> system: akka.actor.ActorSystem = akka://akka-stream-experiments >>>>>>> >>>>>>> scala> implicit val materializer = ActorMaterializer() >>>>>>> materializer: akka.stream.ActorMaterializer = >>>>>>> ActorMaterializerImpl(akka://akka-stream-experiments,akka.stream.ActorMaterializerSettings@3aab6fda,akka.dispatch.Dispatchers@b75ed3f,Actor[akka://akka-stream-experiments/user/$a#-1004252643],false,0,flow) >>>>>>> >>>>>>> scala> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] >>>>>>> = >>>>>>> | Source(() => Iterator.iterate(f(s))(opt => >>>>>>> f(opt.get._1))).map(_.get._2) >>>>>>> unfold: [S, E](s: S)(f: S => Option[(S, >>>>>>> E)])akka.stream.scaladsl.Source[E,Unit] >>>>>>> >>>>>>> scala> unfold(0->1){ >>>>>>> | case (a,b) if a < 10000000 => Some((b->(a+b),a)) >>>>>>> | case _ => None >>>>>>> | } >>>>>>> res0: akka.stream.scaladsl.Source[Int,Unit] = >>>>>>> akka.stream.scaladsl.Source@404ef48b >>>>>>> >>>>>>> scala> res0.runForeach(println) >>>>>>> res1: scala.concurrent.Future[Unit] = >>>>>>> scala.concurrent.impl.Promise$DefaultPromise@14bc1d08 >>>>>>> >>>>>>> scala> 0 >>>>>>> 1 >>>>>>> 1 >>>>>>> 2 >>>>>>> 3 >>>>>>> 5 >>>>>>> 8 >>>>>>> 13 >>>>>>> 21 >>>>>>> 34 >>>>>>> 55 >>>>>>> 89 >>>>>>> 144 >>>>>>> 233 >>>>>>> 377 >>>>>>> 610 >>>>>>> 987 >>>>>>> 1597 >>>>>>> 2584 >>>>>>> 4181 >>>>>>> 6765 >>>>>>> 10946 >>>>>>> 17711 >>>>>>> 28657 >>>>>>> >>>>>>> >>>>>>> scala> res1.isCompleted >>>>>>> res2: Boolean = true >>>>>>> >>>>>>> so, as you see, not all the numbers are printed. >>>>>>> if I use the unfold implementation from the gist, it works as >>>>>>> expected. >>>>>>> >>>>>>> I guess it has to do with the way I stop. I'm mapping the option to >>>>>>> a get call, >>>>>>> so I guess akka stream doesn't like exceptions thrown from within a >>>>>>> map call. >>>>>>> >>>>>>> Gilad. >>>>>>> >>>>>>> P.S. thanks, I'll open a ticket now :) >>>>>>> >>>>>>> On Thursday, November 26, 2015 at 3:15:05 PM UTC+2, Akka Team wrote: >>>>>>>> >>>>>>>> Hi Gilad, >>>>>>>> >>>>>>>> On Thu, Nov 26, 2015 at 1:56 PM, Gilad Hoch <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm thinking of migrating some code I have written with play's >>>>>>>>> iteratees module, to akka-stream. >>>>>>>>> I'm lacking many useful constructs iteratees have, e.g unfold & >>>>>>>>> unfoldM generators. >>>>>>>>> >>>>>>>>> Iv'e tried doing something naive at first: >>>>>>>>> >>>>>>>>> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] = >>>>>>>>> Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1))) >>>>>>>>> .map(_.get._2) >>>>>>>>> >>>>>>>>> def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: >>>>>>>>> ExecutionContext): Source[E,Unit] = >>>>>>>>> Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get >>>>>>>>> ._1)))) >>>>>>>>> .mapAsync(1)(_.map(_.get._2)) >>>>>>>>> >>>>>>>>> but it didn't worked very well (don't know why). >>>>>>>>> >>>>>>>> >>>>>>>> Can you give us a testcase that fails with the above? There should >>>>>>>> be no problem implementing these signatures (apart from that your >>>>>>>> above >>>>>>>> code does not handle the Option and therefore does not stop). >>>>>>>> >>>>>>>> >>>>>>>>> so I implemented it using actorPublisher instead. >>>>>>>>> >>>>>>>>> code can be found at this gist: >>>>>>>>> https://gist.github.com/hochgi/cbe5ffc6cf2915e31091 >>>>>>>>> >>>>>>>>> this seems to work fine, and it wasn't too hard to implement, >>>>>>>>> so it raises some questions. >>>>>>>>> >>>>>>>> >>>>>>>> Hmm, for these tasks, if the build in operations are not good >>>>>>>> enough you are better of creating a custom stage: >>>>>>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#custom-linear-processing-stages >>>>>>>> >>>>>>>> That does not work yet with asynchronous processing (futures in >>>>>>>> your case) but that will be possible with the new GraphStages which I >>>>>>>> am >>>>>>>> documenting right now. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> 1. I may think the implementation is good, but is it? I'm no >>>>>>>>> expert... >>>>>>>>> 2. if it is so trivial, why it's not in the official API? I think >>>>>>>>> it should be part of the API. >>>>>>>>> >>>>>>>> >>>>>>>> Because >>>>>>>> - we can't come up with all the possible combinators people might >>>>>>>> ever need. Open a ticket if you feel something is missing, and then we >>>>>>>> consider it for inclusion >>>>>>>> - we are very conservative about adding combinators to avoid >>>>>>>> building a kitchen sink. I understand it feels better to have a >>>>>>>> one-liner >>>>>>>> for everything, but Akka Streams is very extensible (more and more >>>>>>>> every >>>>>>>> day) and in the long term that is what matters because no framework >>>>>>>> can >>>>>>>> provide all the operators that people will ever need. That said, >>>>>>>> unfold is >>>>>>>> one nice addition, so please open a ticket. >>>>>>>> >>>>>>>> -Endre >>>>>>>> >>>>>>>> >>>>>>>>> especially since I'm probably not the only one migrating iteratees >>>>>>>>> code to akka streams... >>>>>>>>> >>>>>>>>> thanks, >>>>>>>>> Gilad. >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>> >>>>>>>>>> Check the FAQ: >>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>> >>>>>>>>>> Search the archives: >>>>>>>>> https://groups.google.com/group/akka-user >>>>>>>>> --- >>>>>>>>> You received this message because you are subscribed to the Google >>>>>>>>> Groups "Akka User List" group. >>>>>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>>>>> send an email to [email protected]. >>>>>>>>> To post to this group, send email to [email protected]. >>>>>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Akka Team >>>>>>>> Typesafe - Reactive apps on the JVM >>>>>>>> Blog: letitcrash.com >>>>>>>> Twitter: @akkateam >>>>>>>> >>>>>>> -- >>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>> >>>>>>>>>> Check the FAQ: >>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>> >>>>>>>>>> Search the archives: >>>>>>> https://groups.google.com/group/akka-user >>>>>>> --- >>>>>>> You received this message because you are subscribed to the Google >>>>>>> Groups "Akka User List" group. >>>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>>> send an email to [email protected]. >>>>>>> To post to this group, send email to [email protected]. >>>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>>> >>>>>> >>>>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: >>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>> >>>>>>>>>> Search the archives: >>>>> https://groups.google.com/group/akka-user >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> To post to this group, send email to [email protected]. >>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Akka Team > Typesafe - Reactive apps on the JVM > Blog: letitcrash.com > Twitter: @akkateam > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
