use take(n) -- Cheers, √ On 26 Nov 2015 15:35, "Gilad Hoch" <[email protected]> wrote:
> > > OK, > so how is it possible to generate a finite stream? > if I can't cut the iterator, what can I do? > > BTW, issue is opened: > https://github.com/akka/akka/issues/19021 > > Gilad. > > On Thursday, November 26, 2015 at 4:23:00 PM UTC+2, drewhk wrote: >> >> Hi Gilad, >> >> Exceptions from a stream stage will result in a failure of the overall >> stream. >> >> -Endre >> >> On Thu, Nov 26, 2015 at 3:20 PM, Gilad Hoch <[email protected]> wrote: >> >>> thanks Endre, >>> >>> a simple example to show it does not work as expected, >>> generating a fibonacci stream for all fib numbers under 10M: >>> >>> scala> import akka.stream.scaladsl._ >>> import akka.stream.scaladsl._ >>> >>> scala> import akka.stream.ActorMaterializer >>> import akka.stream.ActorMaterializer >>> >>> scala> import akka.actor.ActorSystem >>> import akka.actor.ActorSystem >>> >>> scala> implicit val system = ActorSystem("akka-stream-experiments") >>> system: akka.actor.ActorSystem = akka://akka-stream-experiments >>> >>> scala> implicit val materializer = ActorMaterializer() >>> materializer: akka.stream.ActorMaterializer = >>> ActorMaterializerImpl(akka://akka-stream-experiments,akka.stream.ActorMaterializerSettings@3aab6fda >>> ,akka.dispatch.Dispatchers@b75ed3f >>> ,Actor[akka://akka-stream-experiments/user/$a#-1004252643],false,0,flow) >>> >>> scala> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] = >>> | Source(() => Iterator.iterate(f(s))(opt => >>> f(opt.get._1))).map(_.get._2) >>> unfold: [S, E](s: S)(f: S => Option[(S, >>> E)])akka.stream.scaladsl.Source[E,Unit] >>> >>> scala> unfold(0->1){ >>> | case (a,b) if a < 10000000 => Some((b->(a+b),a)) >>> | case _ => None >>> | } >>> res0: akka.stream.scaladsl.Source[Int,Unit] = >>> akka.stream.scaladsl.Source@404ef48b >>> >>> scala> res0.runForeach(println) >>> res1: scala.concurrent.Future[Unit] = >>> scala.concurrent.impl.Promise$DefaultPromise@14bc1d08 >>> >>> scala> 0 >>> 1 >>> 1 >>> 2 >>> 3 >>> 5 >>> 8 >>> 13 >>> 21 >>> 34 >>> 55 >>> 89 >>> 144 >>> 233 >>> 377 >>> 610 >>> 987 >>> 1597 >>> 2584 >>> 4181 >>> 6765 >>> 10946 >>> 17711 >>> 28657 >>> >>> >>> scala> res1.isCompleted >>> res2: Boolean = true >>> >>> so, as you see, not all the numbers are printed. >>> if I use the unfold implementation from the gist, it works as expected. >>> >>> I guess it has to do with the way I stop. I'm mapping the option to a >>> get call, >>> so I guess akka stream doesn't like exceptions thrown from within a map >>> call. >>> >>> Gilad. >>> >>> P.S. thanks, I'll open a ticket now :) >>> >>> On Thursday, November 26, 2015 at 3:15:05 PM UTC+2, Akka Team wrote: >>>> >>>> Hi Gilad, >>>> >>>> On Thu, Nov 26, 2015 at 1:56 PM, Gilad Hoch <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I'm thinking of migrating some code I have written with play's >>>>> iteratees module, to akka-stream. >>>>> I'm lacking many useful constructs iteratees have, e.g unfold & >>>>> unfoldM generators. >>>>> >>>>> Iv'e tried doing something naive at first: >>>>> >>>>> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] = >>>>> Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1))) >>>>> .map(_.get._2) >>>>> >>>>> def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: >>>>> ExecutionContext): Source[E,Unit] = >>>>> Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1 >>>>> )))) >>>>> .mapAsync(1)(_.map(_.get._2)) >>>>> >>>>> but it didn't worked very well (don't know why). >>>>> >>>> >>>> Can you give us a testcase that fails with the above? There should be >>>> no problem implementing these signatures (apart from that your above code >>>> does not handle the Option and therefore does not stop). >>>> >>>> >>>>> so I implemented it using actorPublisher instead. >>>>> >>>>> code can be found at this gist: >>>>> https://gist.github.com/hochgi/cbe5ffc6cf2915e31091 >>>>> >>>>> this seems to work fine, and it wasn't too hard to implement, >>>>> so it raises some questions. >>>>> >>>> >>>> Hmm, for these tasks, if the build in operations are not good enough >>>> you are better of creating a custom stage: >>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#custom-linear-processing-stages >>>> >>>> That does not work yet with asynchronous processing (futures in your >>>> case) but that will be possible with the new GraphStages which I am >>>> documenting right now. >>>> >>>> >>>>> >>>>> 1. I may think the implementation is good, but is it? I'm no expert... >>>>> 2. if it is so trivial, why it's not in the official API? I think it >>>>> should be part of the API. >>>>> >>>> >>>> Because >>>> - we can't come up with all the possible combinators people might ever >>>> need. Open a ticket if you feel something is missing, and then we consider >>>> it for inclusion >>>> - we are very conservative about adding combinators to avoid building >>>> a kitchen sink. I understand it feels better to have a one-liner for >>>> everything, but Akka Streams is very extensible (more and more every day) >>>> and in the long term that is what matters because no framework can provide >>>> all the operators that people will ever need. That said, unfold is one nice >>>> addition, so please open a ticket. >>>> >>>> -Endre >>>> >>>> >>>>> especially since I'm probably not the only one migrating iteratees >>>>> code to akka streams... >>>>> >>>>> thanks, >>>>> Gilad. >>>>> >>>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: >>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>> >>>>>>>>>> Search the archives: >>>>> https://groups.google.com/group/akka-user >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> To post to this group, send email to [email protected]. >>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> >>>> >>>> -- >>>> Akka Team >>>> Typesafe - Reactive apps on the JVM >>>> Blog: letitcrash.com >>>> Twitter: @akkateam >>>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
