In the Activate example for groupBy, we see this:

Source(() => logFile.getLines()). // group them by log level groupBy { case 
LoglevelPattern(level) => level case other => "OTHER" }. // write lines of 
each group to a separate file foreach { case (level, groupFlow) => val 
output = new PrintWriter(new FileOutputStream(s"target/log-$level.txt"), 
true) // close resource when the group stream is completed // foreach 
returns a future that we can key the close() off of groupFlow.foreach(line 
=> output.println(line)).onComplete(_ => Try(output.close())) }. onComplete 
{ _ => Try(logFile.close()) system.shutdown() }
I have been following this pattern, but am finding that the Future returned 
by the foreach on the groupBy completes BEFORE the groupFlows are finished 
processing. In effect, this leaves several messages yet to be processed. 
I've observed this issue in akka-stream-experimental M1 and M2.

I have created the following gist which demonstrates the problem:

https://gist.github.com/timcharper/9824eee567f24b4205f3

Is the Activator example a bad example of how to use streams? Or is akka 
streams misbehaving? (is it impossible for a group stream to know that it's 
groupFlows are completed)?

Thanks,

Tim

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to