[ 
https://issues.apache.org/jira/browse/CAMEL-11615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120433#comment-16120433
 ] 

Qaiser Abbasi commented on CAMEL-11615:
---------------------------------------

After digging through the codebase I've a proposal which I wanted to discuss 
with you before I go out for the actual implementation and testing of it.

Currently there's no way to stop clients calling the 
CamelReactiveStreamsService#fromStream static factory method right after 
instantiating the CamelReactiveStreamsService, i.e. not waiting for the actual 
bootstrapping of the service. (One could also provide a kind of callback 
method, which would be executed when the bootstrapping is done. But that would 
look rather unusual in my POV) TBH I think the client shouldn't be bothered 
with the internals of the service. Long story short: I'd propose that we 
enqueue all fromStream()-calls inside the service and would execute them right 
after when CamelReactiveStreamsService#doStart is called. Otherwise – as you 
already pointed out – we would never have a backed ThreadPool inside the 
created publisher & subscribers.

Regarding the StartupListener: this would not work for services but rather for 
components, as you may know. I think, CamelReactiveStreamsService#doStart 
provides the same mimic/mechanism for this synchronisation-logic. What do you 
say?

> WorkerPool is null for auto-configured RxJava-based application
> ---------------------------------------------------------------
>
>                 Key: CAMEL-11615
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11615
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-reactive-streams
>    Affects Versions: 2.19.1
>         Environment: OS: OSX 10.11.6
> JDK: Oracle 1.8.0_141
> MVN: 3.5.0
>            Reporter: Qaiser Abbasi
>             Fix For: 2.20.0, 2.19.3
>
>
> In https://github.com/qabbasi/reactive-camel-demo I'm using the 
> camel-reactive-streams-starter project to create a application based on 
> RxJava2. Currently I'm getting a NPE during runtime.
> I also tried to test the sample route inside 
> https://github.com/apache/camel/tree/master/examples/camel-example-reactive-streams,
>  but unfortunately this also results in the same exception:
> java.lang.NullPointerException: null
>         at 
> org.apache.camel.component.reactive.streams.engine.CamelSubscription.checkAndFlush(CamelSubscription.java:123)
>  ~[camel-reactive-streams-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.reactive.streams.engine.CamelSubscription.publish(CamelSubscription.java:247)
>  ~[camel-reactive-streams-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:100)
>  ~[camel-reactive-streams-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:123)
>  ~[camel-reactive-streams-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:44)
>  ~[camel-reactive-streams-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145) 
> ~[camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>  ~[camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>  ~[camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>  [camel-core-2.19.1.jar:2.19.1]
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) 
> ~[camel-core-2.19.1.jar:2.19.1]
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) 
> ~[camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>  [camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197)
>  [camel-core-2.19.1.jar:2.19.1]
>         at 
> org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) 
> [camel-core-2.19.1.jar:2.19.1]
>         at java.util.TimerThread.mainLoop(Timer.java:555) [na:1.8.0_141]
>         at java.util.TimerThread.run(Timer.java:505) [na:1.8.0_141]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to