Yea, I think it's caused by the move from Spring 2.5.4 to 2.5.6. The Spring JMS stuff doesn't seem to tolerate an ungraceful shutdown which is what I think is happening. I want to dig in a bit furthur and possibly log a bug with Spring, just haven't had the time.
Dan On Tuesday 11 November 2008 1:15:43 pm Sergey Beryozkin wrote: > Hi, seeing this NPE with the latest trunk : > > T E S T S > ------------------------------------------------------- > Running > org.apache.cxf.systest.multitransport.MultiTransportClientServerTest > Exception in thread "DefaultMessageListenerContainer-1" > java.lang.NullPointerExc eption > at java.lang.String.indexOf(String.java:1564) > at java.lang.String.indexOf(String.java:1546) > at > org.springframework.jms.support.JmsUtils.buildExceptionMessage(JmsUti > ls.java:255) > at > org.springframework.jms.listener.DefaultMessageListenerContainer.refr > eshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:799) at > org.springframework.jms.listener.DefaultMessageListenerContainer.reco > verAfterListenerSetupFailure(DefaultMessageListenerContainer.java:767) at > org.springframework.jms.listener.DefaultMessageListenerContainer$Asyn > cMessageListenerInvoker.run(DefaultMessageListenerContainer.java:898) at > java.lang.Thread.run(Thread.java:595) > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.425 sec > > > looks like it's swallowed - can someone else see it ? > > Sergey > > > ----- Original Message ----- > From: "Sergey Beryozkin" <[EMAIL PROTECTED]> > To: "Daniel Kulp" <[EMAIL PROTECTED]>; <[email protected]> > Sent: Tuesday, November 11, 2008 5:51 PM > Subject: Re: Jetty Continuations in CXF > > > Hi > > > >>> I have 10 threads involved, 5 control ones + 5 application ones, I see > >>> a loss of message approximately once in 5 cases. The fact that > >>> cont.resume() is done virtually immediately after cont.suspend() can > >>> explain it. > >> > >> Without seeing your code, I cannot really offer valid suggestions, but > >> I'll try.... :-) > > > > I guess having it all on a branch would be handy then :-) > > > >> One thought was in the Continuation object, record if "resume()" has > >> been called and if it's been callled by the time the stack unwinds back > >> into the Http transport, just re-dispatch immediately. Either that or > >> have the resume block until the http transport sets a "ready to resume" > >> flag just before it allows the exception to flow back into jetty. > > > > I have 2 tests. > > > > In one test an application server code interacts with a wrapper, both > > when getting a continuation instance and when calling suspend/resume on > > it (as suggested by yourself earlier in this thread). In this case, under > > the hood, an inbound message is associated with a continuation instance > > before suspend() is called on it. Thus even if the resulting exception > > does not reach Jetty Destination in time before continuation.resume() is > > called by a control thread, the message is not lost when the HTTP request > > is resumed as that HTTP request had this continuation instance associated > > with it at a time ContinuationsSupport.getContinuations(request) was > > called. > > > > In other test which I believe represents an integration scenario with SMX > > better, an application server code calls Jetty > > ContinuationsSupport.getContinuations(request) followed by > > continuation.suspend(). Now, in this case, before a (Jetty RetryRequest) > > runtime exception reaches a catch block in AbstractInvoker (where I try > > to associate a message with continuation), one or two control threads > > manage to squeeze in and call resume() before catch block has even been > > processed. So by the time the wrapped exception reaches JettyDestination > > a request with a resumed continuation has already come back... > > > > Does this explanation for a second case and the associated race condition > > sounds reasonable ? > > > > Cheers, Sergey > > > >> Dan > >> > >>> Cheers, Sergey > >>> > >>> > That said, I'm now trying to inject a message as a custom > >>> > continuation object (while preserving the original one if any, both > >>> > ways) as early as possible, in AbstractInvoker, so the time window at > >>> > which the race condition I talked about earlier can cause the loss of > >>> > the original message, is extremely small the time it taked for the > >>> > continuation.suspend() exception to reach a catch block in > >>> > AbstractInvoker. > >>> > > >>> > Cheers, Sergey > >>> > > >>> >> Hi, > >>> >> > >>> >> I did some system testing with Jetty continuations and it's going > >>> >> not too bad. Here's one issue which I've encountered which might or > >>> >> might not be a problem in cases where continuations are ustilized > >>> >> directly (that is without our wrappers), as in case of say > >>> >> ServiceMix CXF binding component. > >>> >> > >>> >> The problem is that when continuation.suspend(timeout) has been > >>> >> called, a resulting RuntimeException might not reach CXF > >>> >> JettyDestination (such that the original message with its phase > >>> >> chain can be preserved until the request is resumed) if some other > >>> >> application thread calls continuation.resume() or continuation > >>> >> suspend timeout expires. > >>> >> > >>> >> In case of ServiceMix the latter is a theoretical possibility at the > >>> >> least. I can see in its code this timeout is configured, but if this > >>> >> timeout is in the region of up to 1 sec or so then it's feasible > >>> >> that with a heavy workload the race condition described above might > >>> >> come to life. > >>> >> > >>> >> That said, as part of my test, I found that even when such condition > >>> >> occurs, the 'worst' thing which can happen is that a new message and > >>> >> a new chain are created, that is, the request is not resumed from a > >>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new > >>> >> request alltogether, but it all works nonetheless, as all the stack > >>> >> variables used in various interceptors in my given test at least are > >>> >> all obtained from a message. The only downside is that that the work > >>> >> which has already been done earlier as part of handling the > >>> >> suspended request is repeated again by the interceptors. It can > >>> >> cause issues though in cases when some interceptors have sideeffects > >>> >> as part of handling a given input request, say modify a db, etc > >>> >> > >>> >> Now, this race condition can be safely avoided if a wrapper proposed > >>> >> by Dan is used by a server application code as the message can be > >>> >> preserved immediately at a point a user calls suspend on our > >>> >> wrapper, so without further doubts I've prototyped it too. It's not > >>> >> possible for SMX components though > >>> >> > >>> >> Comments ? > >>> >> > >>> >> Cheers, Sergey > >>> >> > >>> >>> I guess my thinking was to tie the continutations directly to the > >>> >>> PhaseInterceptorChain (since that is going to need to know about > >>> >>> them anyway). However, I suppose it could easily be done with a > >>> >>> new interface. Probably the best thing to do is to stub out a > >>> >>> sample usecase. So here goes..... > >>> >>> > >>> >>> Lets take a "GreetMe" web service that in the greetMe method will > >>> >>> call off asynchrously to some JMS service to actually get the > >>> >>> result. > >>> >>> > >>> >>> @Resource(name = "jmsClient") > >>> >>> Greeter jmsGreeter > >>> >>> @Resource > >>> >>> WebServiceContext context; > >>> >>> public String greetMe(String arg) { > >>> >>> ContinuationSupport contSupport = (ContinuationSupport) > >>> >>> context.get(ContinuationSupport.class.getName()); > >>> >>> if (contSupport == null) { > >>> >>> //continuations not supported, must wait > >>> >>> return jmsGreeter.greetMe(arg); > >>> >>> } > >>> >>> Continuation cont = contSupport.getContinuation(); > >>> >>> if (cont.isResumed()) { > >>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject(); > >>> >>> return handler.get().getReturn(); > >>> >>> } else { > >>> >>> AsyncHandler<GreetMeResponse> handler = new Handler(cont); > >>> >>> jmsGreeter.greetMeAsync(arg, handler); > >>> >>> cont.suspend(handler); > >>> >>> return null; //won't actually get here as suspend will throw a > >>> >>> ContinuationException > >>> >>> } > >>> >>> } > >>> >>> > >>> >>> The Handler would look something like: > >>> >>> class Handler implements AsyncHandler<GreetMeResponse> { > >>> >>> GreetMeResponse resp; > >>> >>> Continuation cont; > >>> >>> public Handler(Continuation cont) { > >>> >>> this.cont = cont; > >>> >>> } > >>> >>> public void handleResponse(Response<GreetMeLaterResponse> > >>> >>> response) { resp = response.get(); > >>> >>> cont.resume(); > >>> >>> } > >>> >>> } > >>> >>> > >>> >>> Basically, the HTTP/Jetty transport could provide an implementation > >>> >>> of ContinuationSupport that wrappers the jetty stuff. JMS could > >>> >>> provide one that's pretty much a null op. Transports that cannot > >>> >>> support it (like servlet) just wouldn't provide an implementation. > >>> >>> > >>> >>> > >>> >>> Does that make sense? Other ideas? > >>> >>> > >>> >>> Dan > >>> >>> > >>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote: > >>> >>>> > No. We don't want that. Whatever we do should work for other > >>> >>>> > transports as well like JMS. Thus, this shouldn't be tied to > >>> >>>> > jetty continuations directly. > >>> >>>> > >>> >>>> No, I'm not suggesting to tie it up to jetty continuations. > >>> >>>> Ex. > >>> >>>> > >>> >>>> try { > >>> >>>> invoke(); // continuation.suspend() somehow by the code being > >>> >>>> invoked upon } > >>> >>>> catch (RuntimeException ex) { > >>> >>>> > >>> >>>> if > >>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException" > >>> >>>>)) throw new SuspendedFault(ex); > >>> >>>> // or PhaseInterceptorChain.suspend() > >>> >>>> } > >>> >>>> } > >>> >>>> > >>> >>>> > Most likely, we could add a "suspend()" method to > >>> >>>> > PhaseInterceptorChain that would do something very similar and > >>> >>>> > throw a "SuspendException" or something in the same package as > >>> >>>> > PhaseInterceptorChain. > >>> >>>> > >>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call > >>> >>>> though ? > >>> >>>> > >>> >>>> > That would get propogated > >>> >>>> > back to the JettyDestination that could then call the jetty > >>> >>>> > things. The JMS transport could just catch it and more or less > >>> >>>> > ignore it. We'd then have to add a "resume()" method to the > >>> >>>> > chain which would call back onto a listener that the transport > >>> >>>> > provides. Jetty would just call the jetty resume stuff. JMS > >>> >>>> > would probably put a runnable on the workqueue to restart the > >>> >>>> > chain. > >>> >>>> > >>> >>>> ok > >>> >>>> > >>> >>>> > Also, suspend() would need to check if there is a listener. If > >>> >>>> > not, it should not throw the exception. Thus, the servlet > >>> >>>> > transport and CORBA stuff that couldn't do this would pretty > >>> >>>> > much just ignore it. > >>> >>>> > >>> >>>> ok, not sure I understand about the listener but I think I see > >>> >>>> what you mean... > >>> >>>> > >>> >>>> > Basically, this needs to be done in such a way that it CAN work > >>> >>>> > for the non-jetty cases. However, it also needs to be done in > >>> >>>> > a way that doesn't affect existing transports. > >>> >>>> > >>> >>>> +1 > >>> >>>> > >>> >>>> Cheers, Sergey > >>> >>>> > >>> >>>> > Dan > >>> >>>> > > >>> >>>> >> 2. Now, if the above can be figured out, the next problem > >>> >>>> >> arises: when the "trigger" to wake up the continuation occurs > >>> >>>> >> > >>> >>>> >> I think we can can do in JettyDestination omething similar to > >>> >>>> >> what is done in SMX. When getting a SuspendedFault exception, > >>> >>>> >> we can extract from it the original continuation instance or > >>> >>>> >> else we can do ContinuationSupport.getContinuation(request) > >>> >>>> >> which should return us the instance. At this point we can use > >>> >>>> >> it as a ket to store the current exchange plus all the other > >>> >>>> >> info we may need. > >>> >>>> >> > >>> >>>> >> When the user/application code does continuation.resume(), the > >>> >>>> >> Jetty thread will come back and we will use the > >>> >>>> >> ContinuationSupport.getContinuation(request) to get us the > >>> >>>> >> active continuation and use it to extract the suspended > >>> >>>> >> exchange and proceed from there, say we'll call > >>> >>>> >> PhaseInterceptorPhase.resume(), etc, something along the lines > >>> >>>> >> you suggested > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty > >>> >>>> >> much everything to make sure nothing is stored on the stack and > >>> >>>> >> is "resumable". Once that is done, the rest is relatively easy. > >>> >>>> >> > >>> >>>> >> Yea - probably can be the quite challenging > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> Thoughts ? > >>> >>>> >> > >>> >>>> >> Cheers, Sergey > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> > >>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations > >>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835 > >>> >>>> >> [3] > >>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId > >>> >>>> >>=126 42361 #ac tion_12642361 > >>> >>>> > > >>> >>>> > -- > >>> >>>> > Daniel Kulp > >>> >>>> > [EMAIL PROTECTED] > >>> >>>> > http://dankulp.com/blog > >>> >>> > >>> >>> -- > >>> >>> Daniel Kulp > >>> >>> [EMAIL PROTECTED] > >>> >>> http://dankulp.com/blog > >> > >> -- > >> Daniel Kulp > >> [EMAIL PROTECTED] > >> http://dankulp.com/blog -- Daniel Kulp [EMAIL PROTECTED] http://dankulp.com/blog
