[
https://issues.apache.org/jira/browse/CXF-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819643#comment-16819643
]
KimJohn Quinn edited comment on CXF-8022 at 4/17/19 1:23 AM:
-------------------------------------------------------------
This definitely gets past the thread hanging which is a good thing but now I am
looking at the exception handling, which worked prior to the patch, and the
fact the response is always coming back as a 200.
I have a test stream that looks like this:
{code:java}
return super
.findArtifact(artifact, project, client)
.cast(Ruleset.class)
.flatMapMany(a -> {
//For each file generate rules
return fromIterable(files)
.map(File::new)
.filter(GeneratorResource::checkExtension)
.publishOn(elastic())
.flatMap(f -> this.decisionTableService
.getSources(f)
//For each sheet
.flatMap(sheet -> this.decisionTableService
//Generate details
.generateTable(sheet, f)
.flatMapMany(table -> this.decisionTableService
//Generate rows
.generateRules(table, f)
//Buffer rows
.buffer(buffer)
//Find and remove matches then merge for save
.flatMap(rules -> replace(rules, a)))));
})
.onErrorMap(RulesetResourceException::new);
{code}
The *findArtifact* used to throw a NotFoundException which resulted in the
ExceptionMapper kicking in and returning the proper HTTP error (404).
The code from *findArtifact* looks like:
{code:java}
return this
.findProject(project, client)
.flatMap(p -> this.artifactService
.findByName(artifact, p)
.switchIfEmpty(error(NotFoundException::new)))
.onErrorMap(ArtifactResourceException::new);
{code}
And the CXF ExceptionMapper handles the exception. So, if I have a bad
"artifact", "project" any one of those would throw the exception and should
terminate the stream. With the patch, the exception is not being returned and
the resource results always in a 200 but I can see in the log where the
ExceptionMapper did in fact pick up the error (which leads me to believe it
might be something on how I am handling the exception).
But, at least right now, it looks like any exceptions leading up to the
"getSources" where I am forcibly throwing a RuntimeException, get passed up.
This could be me handling errors improperly so I am investigating it with the
patch applied - I was under the assumption that the onErrorMap is treated as
the equivalent of rethrowing so I can get to the ExceptionMapper but I just
wanted to get my thoughts down and over to you.
I am looking at my exception handling now so don't take this as me saying the
patch does not work ;). In fact, im inclined to believe it is something i am
doing wrong...
And by the way, thank you for you help too - it is greatly appreciated.
was (Author: kjq):
This definitely gets past the thread hanging which is a good thing but now I am
looking at the exception handling, which worked prior to the patch, and the
fact the response is always coming back as a 200.
I have a test stream that looks like this:
{code:java}
return super
.findArtifact(artifact, project, client)
.cast(Ruleset.class)
.flatMapMany(a -> {
//For each file generate rules
return fromIterable(files)
.map(File::new)
.filter(GeneratorResource::checkExtension)
.publishOn(elastic())
.flatMap(f -> this.decisionTableService
.getSources(f)
//For each sheet
.flatMap(sheet -> this.decisionTableService
//Generate details
.generateTable(sheet, f)
.flatMapMany(table -> this.decisionTableService
//Generate rows
.generateRules(table, f)
//Buffer rows
.buffer(buffer)
//Find and remove matches then merge for save
.flatMap(rules -> replace(rules, a)))));
})
.onErrorMap(RulesetResourceException::new);
{code}
The *findArtifact* used to throw a NotFoundException which resulted in the
ExceptionMapper kicking in and returning the proper HTTP error (404).
The code from *findArtifact* looks like:
{code:java}
return this
.findProject(project, client)
.flatMap(p -> this.artifactService
.findByName(artifact, p)
.switchIfEmpty(error(NotFoundException::new)))
.onErrorMap(ArtifactResourceException::new);
{code}
And the CXF ExceptionMapper handles the exception. So, if I have a bad
"artifact", "project" any one of those would throw the exception and should
terminate the stream. With the patch, the exception is not being returned and
the resource results always in a 200 but I can see in the log where the
ExceptionMapper did in fact pick up the error (which leads me to believe it
might be something on how I am handling the exception).
But, at least right now, it looks like any exceptions leading up to the
"getSources" where I am forcibly throwing a RuntimeException, get passed up.
This could be me handling errors improperly so I am investigating it with the
patch applied - I was under the assumption that the onErrorMap is treated as
the equivalent of rethrowing so I can get to the ExceptionMapper but I just
wanted to get my thoughts down and over to you.
I am looking at my exception handling now so don't take this as me saying the
patch does not work ;)
And by the way, thank you for you help too - it is greatly appreciated.
> Thread hangs using Reactor Flux when Exeption is Thrown
> -------------------------------------------------------
>
> Key: CXF-8022
> URL: https://issues.apache.org/jira/browse/CXF-8022
> Project: CXF
> Issue Type: Bug
> Components: JAX-RS
> Affects Versions: 3.2.8, 3.3.1
> Environment: * JDK 11
> * CXF 3.3.1
> * Spring Boot (latest)
> * Undertow (latest)
> * Reactor (latest)
> Reporter: KimJohn Quinn
> Assignee: Andriy Redko
> Priority: Blocker
> Labels: cxf, reactor, spring-boot
> Time Spent: 10m
> Remaining Estimate: 0h
>
> We have run into a serious issue that was not expected or caught until late
> it appears with CXF and Reactor in Undertow (using Spring Boot)...
> The Flux throws an exception inside and hangs infinitely, requiring the
> process to be forcibly killed (kill -9).
> We are using Spring Boot (latest), CXF (latest) with Reactor extension,
> Undertow and Reactor. Has anyone run into this and are there or could there
> be any potential work around to handle this?
> We wanted to stick with using CXF for both our standard REST resources and
> Reactor resources because CXF has always worked well for us plus we are using
> the OpenAPI swagger integration. Not being able to resolve this or have an
> established workaround may cause us to have to completely revert to Spring
> controllers.
> There seems to be a problem with the onComplete handling of Fluxes when it
> throws a RuntimeException. Basically, if you try to the endpoint, it will
> hang. I've tracked it down to the writeTo function in
> StreamingAsyncSubscriber where the queue is empty but "completed" is still
> false so it goes into an infinite loop. You can repeat it by running the
> code below.
> {code:java}
> @Path("/errors")
> @Produces(APPLICATION_JSON)
> public Flux<String> errors()
> {
> Flux<String> response = Flux
> .range(1, 5)
> .flatMap(item->
> {
> if (item <=4)
> {
> return Mono.just("item: " + item);
> }
> else
> {
> System.out.println("---Hitting exception");
> return Mono.error(new RuntimeException("runtime
> error"));
> }
> });
> return response;
> } {code}
>
> I also posted this on the forums hoping someone can assist -
> [http://cxf.547215.n5.nabble.com/Thread-hangs-when-using-reactor-cxf-and-undertow-td5796369.html]
>
> The log output, up until manually killing the process looks like:
> [DEBUG] o.a.c.j.interceptor.JAXRSInInterceptor : Found operation: errors
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.OneWayProcessorIntercep tor@6e760b48
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.ServiceInvokerIntercept or@6d7702a0
> [DEBUG] o.a.cxf.service.invoker.AbstractInvoker : Invoking method public
> reactor.core.publisher.Flux io.logicdrop.drools.endpoints.DroolsR
> ulesetResource.errors() on object
> io.logicdrop.drools.endpoints.DroolsRulesetResource@23718909 with params [].
> [DEBUG] reactor.util.Loggers$LoggerFactory : Using Slf4j logging framework
> ---Hitting exception
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.OutgoingChainIntercepto r@308be65a
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by bus: []
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> service: []
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> endpoint: [org.apache.cxf.interceptor.MessageSenderIntercept or@3f0061c3]
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> binding: [org.apache.cxf.jaxrs.interceptor.JAXRSOutIntercept or@5419dbea]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.interceptor.MessageSenderInterceptor@3f0061c3 to phase
> prepare-send
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.jaxrs.interceptor.JAXRSOutInterceptor@5419dbea to phas e
> marshal
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Chain
> org.apache.cxf.phase.PhaseInterceptorChain@63b5b52 was created. Current flow:
> prepare-send [MessageSenderInterceptor]
> marshal [JAXRSOutInterceptor]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.MessageSenderIntercepto r@3f0061c3
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndi
> ngInterceptor@30cf09f to phase prepare-send-ending
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Chain
> org.apache.cxf.phase.PhaseInterceptorChain@63b5b52 was modified. Current flow:
> prepare-send [MessageSenderInterceptor]
> marshal [JAXRSOutInterceptor]
> prepare-send-ending [MessageSenderEndingInterceptor]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.jaxrs.interceptor.JAXRSOutIntercept or@5419dbea
> [DEBUG] o.a.c.j.interceptor.JAXRSOutInterceptor : Response content type is:
> application/json
> [DEBUG] o.apache.cxf.ws.addressing.ContextUtils : retrieving MAPs from
> context property javax.xml.ws.addressing.context.inbound
> [DEBUG] o.apache.cxf.ws.addressing.ContextUtils : WS-Addressing - failed to
> retrieve Message Addressing Properties from context
> Same scenario with a Mono appears to show the stream "completing"...
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)