As promised, I tested it and come back with some results :

This code works as expected :
 - when an error is transmitted from the source via the onError channel of 
the stream:
  - The source is recreated 2 times,
    and the message "Source error management" is printed.
  - The third time, the stream collapses as expected.
 
    implicit val materializer = ActorMaterializer(ActorMaterializerSettings(
system2)


    // source declaration
    val publiString: Publisher[String] = MongoClients.create(mongoSettings).
listDatabaseNames()
    val source: Source[String, NotUsed] = Source.fromPublisher(publiString)


    // source supervision
    val source2 = source.recoverWithRetries(
      2,
      {
        case _ => {
          println("Source error management")
          // declare a new source
          val sourceBis: Source[String, NotUsed] =
            Source.fromPublisher(MongoClients.create(mongoSettings).
listDatabaseNames())
          sourceBis
        }
      }
    )


    // define a printing stage
    val printFlow: Flow[String, String, NotUsed] = Flow.fromFunction({ s => 
println(s); s })


    // wire the source with the printing stage and a sink, run it and 
terminate once done.
    source2.via(printFlow).runWith(Sink.ignore).onComplete(_ => system2.
terminate())



to wrap it up :


# To have exhaustive error management:

## One should define a Supervision.Decider

    val errorDecider: Supervision.Decider = {
      case e => {
        println("Stream error management" + e)
        Supervision.resume
      }
    }


 and associate it with the ActorMaterializer, (or with individua stream 
processing stages) :
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(
system2)
      .withSupervisionStrategy(errorDecider))


The Supervision.Decider
deals with :
 - errors produced inside explicitely defined akka Flows or junctions,
but does not deal with :
 - errors produced by ZipWith, GraphStage, actorPublisher, ActorSubscriber, 
and Source.fromPublisher() sources.


## One should define some recoverWith partial functions

For stages that do not support Supervision.Decider:

someSupervisedStage = someStage.recoverWithRetries(
      2,
      {
        case _ => {
          println("Stage error management")
          [recover logic producing a new source]
        }
      }




Is that correct ?



Thank you again for your help and patience Konrad.



Best regards,
Antonin PA

Le mardi 21 février 2017 17:20:15 UTC+1, Konrad Malawski a écrit :
>
> Thanks, that's good feedback - we should include mention of the recover 
> methods in there more prominently :)
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 22 February 2017 at 01:14:35, antonin perrot-audet (perro...@gmail.com 
> <javascript:>) wrote:
>
> Thank you Konrad, I think that this is what I am looking for, I'll check 
> them out and let you know. 
>
> I was blindly following the 7.12 "error handling" chapter of the doc, and 
> though that defining a strategy for managing errors in one place this way 
> was quite neat.
>
> Best,
>
> Antonin PA
>
> Le 21 févr. 2017 16:48, "Konrad Malawski" <konrad....@lightbend.com 
> <javascript:>> a écrit :
>
>> I don't think you need supervision for those things actually - have you 
>> looked at the various `recover*` methods defined on Source/Flow?
>> You can recover from failures using those.
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 22 February 2017 at 00:47:01, antonin perrot-audet (perro...@gmail.com 
>> <javascript:>) wrote:
>>
>> Hi Konrad,
>>
>> thanks a lot, your answer does help a lot.
>>
>> - My bad, the onError from MongoDb publisher returns normally but passes 
>> an Exception object, their implementation seems to be correct according to 
>> the reactive stream spec.
>> - Ok, so the supervision does not work for the source created via 
>> fromPublisher(), is there a way to catch this propagated failure somewhere 
>> else in the stream ? Would it be better to wrap the publisher in an actor ?
>>
>> Best,
>> Antonin PA
>>
>>
>> Le mardi 21 février 2017 16:37:37 UTC+1, Konrad Malawski a écrit : 
>>>
>>> Hi Antonin,
>>> Two things here:
>>>
>>> One: "throws an error on the onError callback"? This is not allowed by 
>>> the spec:
>>>
>>> Calling onSubscribe, onNext, onError or onComplete MUST return normally 
>>> except when any provided parameter is null in which case it MUST throw 
>>> a java.lang.NullPointerException to the caller, for all other 
>>> situations the only legal way for a Subscriber to signal failure is by 
>>> cancelling its Subscription. In the case that this rule is violated, 
>>> any associated Subscription to the Subscriber MUST be considered as 
>>> cancelled, and the caller MUST raise this error condition in a fashion that 
>>> is adequate for the runtime environment.
>>> https://github.com/reactive-streams/reactive-streams-jvm#2.13
>>>
>>> Is that mongo implementation surely tested and conforming to the 
>>> reactive streams spec?
>>>
>>>
>>> Two: Supervision does not work for arbitrary 3rd party publishers - it 
>>> only works within Akka Streams (specific stages, specifically handle it), 
>>> it's an additional feature Akka Streams provide over what Reactive Streams 
>>> do. 
>>>
>>> Hope this helps
>>>
>>> -- 
>>> Konrad `ktoso` Malawski
>>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>>
>>> On 22 February 2017 at 00:04:06, antonin perrot-audet (
>>> perro...@gmail.com) wrote:
>>>
>>> Hello,
>>>
>>> has anyone succeded at having a source fromPublisher() conform to the 
>>> SupervisionStrategy defined in the ActorMaterializer ?
>>>
>>>
>>> I have that publisher  :
>>> MongoClients.create(mongoSettings).listDatabaseNames()
>>> That throws an exception on the onError callback, but the errorDecider 
>>> never gets called. The stream colapses on a failed Future :
>>>
>>>
>>>     implicit val system2 = ActorSystem("Sys2")
>>>     import system2.dispatcher
>>>
>>>     val errorDecider : Supervision.Decider = {
>>>       case _ => {
>>>         println("errorDecider does something, yeiii !")
>>>         Supervision.stop}
>>>     }
>>>
>>>     implicit val materializer = ActorMaterializer(
>>> ActorMaterializerSettings(system2)
>>>       .withSupervisionStrategy(errorDecider))
>>>
>>>     val publiString : Publisher[String] = MongoClients.create(
>>> mongoSettings).listDatabaseNames()
>>>     val stream: Source[String, NotUsed] = Source.fromPublisher(
>>> publiString)
>>>
>>>     stream.map( {s => println(s);s}).runWith(Sink.ignore).onComplete(_ 
>>> => system2.terminate())
>>>
>>>
>>>
>>> thanks in advace for your responses.
>>>
>>> Best,
>>>
>>> Antonin PA
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to