thanks Endre,

a simple example to show it does not work as expected,
generating a fibonacci stream for all fib numbers under 10M:

scala> import akka.stream.scaladsl._
import akka.stream.scaladsl._

scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer

scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem

scala> implicit val system = ActorSystem("akka-stream-experiments")
system: akka.actor.ActorSystem = akka://akka-stream-experiments

scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = 
ActorMaterializerImpl(akka://akka-stream-experiments,akka.stream.ActorMaterializerSettings@3aab6fda,akka.dispatch.Dispatchers@b75ed3f,Actor[akka://akka-stream-experiments/user/$a#-1004252643],false,0,flow)

scala> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] = 
     |     Source(() => Iterator.iterate(f(s))(opt => 
f(opt.get._1))).map(_.get._2)
unfold: [S, E](s: S)(f: S => Option[(S, 
E)])akka.stream.scaladsl.Source[E,Unit]

scala> unfold(0->1){
     | case (a,b) if a < 10000000 => Some((b->(a+b),a))
     | case _ => None
     | }
res0: akka.stream.scaladsl.Source[Int,Unit] = 
akka.stream.scaladsl.Source@404ef48b

scala> res0.runForeach(println)
res1: scala.concurrent.Future[Unit] = 
scala.concurrent.impl.Promise$DefaultPromise@14bc1d08

scala> 0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
6765
10946
17711
28657


scala> res1.isCompleted
res2: Boolean = true

so, as you see, not all the numbers are printed.
if I use the unfold implementation from the gist, it works as expected.

I guess it has to do with the way I stop. I'm mapping the option to a get 
call,
so I guess akka stream doesn't like exceptions thrown from within a map 
call.

Gilad.

P.S. thanks, I'll open a ticket now :)

On Thursday, November 26, 2015 at 3:15:05 PM UTC+2, Akka Team wrote:
>
> Hi Gilad,
>
> On Thu, Nov 26, 2015 at 1:56 PM, Gilad Hoch <[email protected] 
> <javascript:>> wrote:
>
>> Hi,
>>
>> I'm thinking of migrating some code I have written with play's iteratees 
>> module, to akka-stream.
>> I'm lacking many useful constructs iteratees have, e.g unfold & unfoldM 
>> generators.
>>
>> Iv'e tried doing something naive at first:
>>
>> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] = 
>>     Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1)))
>>       .map(_.get._2)
>>
>> def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: 
>> ExecutionContext): Source[E,Unit] = 
>>     Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1))))
>>       .mapAsync(1)(_.map(_.get._2))
>>
>> but it didn't worked very well (don't know why).
>>
>
> Can you give us a testcase that fails with the above? There should be no 
> problem implementing these signatures (apart from that your above code does 
> not handle the Option and therefore does not stop).
>  
>
>> so I implemented it using actorPublisher instead.
>>
>> code can be found at this gist: 
>> https://gist.github.com/hochgi/cbe5ffc6cf2915e31091
>>
>> this seems to work fine, and it wasn't too hard to implement,
>> so it raises some questions.
>>
>
> Hmm, for these tasks, if the build in operations are not good enough you 
> are better of creating a custom stage: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#custom-linear-processing-stages
>
> That does not work yet with asynchronous processing (futures in your case) 
> but that will be possible with the new GraphStages which I am documenting 
> right now.
>  
>
>>
>> 1. I may think the implementation is good, but is it? I'm no expert...
>> 2. if it is so trivial, why it's not in the official API? I think it 
>> should be part of the API. 
>>
>
> Because 
>  - we can't come up with all the possible combinators people might ever 
> need. Open a ticket if you feel something is missing, and then we consider 
> it for inclusion
>  - we are very conservative about adding combinators to avoid building a 
> kitchen sink. I understand it feels better to have a one-liner for 
> everything, but Akka Streams is very extensible (more and more every day) 
> and in the long term that is what matters because no framework can provide 
> all the operators that people will ever need. That said, unfold is one nice 
> addition, so please open a ticket.
>
> -Endre
>  
>
>> especially since I'm probably not the only one migrating iteratees code 
>> to akka streams...
>>
>> thanks,
>> Gilad.
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Akka Team
> Typesafe - Reactive apps on the JVM
> Blog: letitcrash.com
> Twitter: @akkateam
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to