Before I assume and report this as a bug, I wanted to see if there was 
something I was missing regarding the error handling with Akka streams 
here. I am surprised to see two different outputs for the two example 
streams:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MapMergeConcatError extends App {
  implicit val system = ActorSystem("Main")
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher

  val subFlow = {
    Flow[Int]
      .mapAsyncUnordered(5)(i => Future {
        if (i == 4) sys.error("☠")
        i * 5
      })
      
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
      .reduce(_ + _)
  }

  val subStreamFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .flatMapMerge(5, m => Source.single(m).mapConcat(identity).via(subFlow))
    
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val mapAsyncFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .mapAsyncUnordered(5)(m => 
Source.single(m).mapConcat(identity).via(subFlow).runWith(Sink.head))
    
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val f1 = Await.ready(subStreamFuture, 10.seconds)
  val f2 = Await.ready(mapAsyncFuture, 10.seconds)

  println(s"Using flatMapMerge: $f1")
  println(s"Using mapAsyncUnordered: $f2")

  system.terminate()
}

The output is:
Using flatMapMerge: Future(Failure(java.lang.RuntimeException: ☠))
Using mapAsyncUnordered: Future(Success(Vector(30, 15)))
The mapAsyncUnordered output is the desired output (the whole item from the 
top-level is dropped when there is a failure in the sub-flow, but the other 
items that did not have a failure in the sub-flow make it through).

Is this a bug, or something subtle that could be explained?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to