Hi Antonin,
Two things here:

One: "throws an error on the onError callback"? This is not allowed by the
spec:

Calling onSubscribe, onNext, onError or onComplete MUST return normally
except when any provided parameter is null in which case it MUST throw a
java.lang.NullPointerException to the caller, for all other situations the
only legal way for a Subscriber to signal failure is by cancelling its
Subscription. In the case that this rule is violated, any associated
Subscription to the Subscriber MUST be considered as cancelled, and the
caller MUST raise this error condition in a fashion that is adequate for
the runtime environment.
https://github.com/reactive-streams/reactive-streams-jvm#2.13

Is that mongo implementation surely tested and conforming to the reactive
streams spec?


Two: Supervision does not work for arbitrary 3rd party publishers - it only
works within Akka Streams (specific stages, specifically handle it), it's
an additional feature Akka Streams provide over what Reactive Streams do.

Hope this helps

-- 
Konrad `ktoso` Malawski
Akka <http://akka.io> @ Lightbend <http://lightbend.com>

On 22 February 2017 at 00:04:06, antonin perrot-audet ([email protected])
wrote:

Hello,

has anyone succeded at having a source fromPublisher() conform to the
SupervisionStrategy defined in the ActorMaterializer ?


I have that publisher  :
MongoClients.create(mongoSettings).listDatabaseNames()
That throws an exception on the onError callback, but the errorDecider
never gets called. The stream colapses on a failed Future :


    implicit val system2 = ActorSystem("Sys2")
    import system2.dispatcher

    val errorDecider : Supervision.Decider = {
      case _ => {
        println("errorDecider does something, yeiii !")
        Supervision.stop}
    }

    implicit val materializer = ActorMaterializer(ActorMaterializerSettings(
system2)
      .withSupervisionStrategy(errorDecider))

    val publiString : Publisher[String] = MongoClients.create(mongoSettings
).listDatabaseNames()
    val stream: Source[String, NotUsed] = Source.fromPublisher(publiString)

    stream.map( {s => println(s);s}).runWith(Sink.ignore).onComplete(_ =>
system2.terminate())



thanks in advace for your responses.

Best,

Antonin PA
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to