You need to associate the continuation with the message before the suspend() method is called, so that whenever the message is ready, it will work. Also, I think you will have to care about timeouts ... Another thing: it would be nice if you could create a branch and commit your ongoing work there so that we can have something more tangible to discuss on ... ;-) We may has well just drop it later, it does not really matter.
On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin <[EMAIL PROTECTED]> wrote: > Hi, > > I have had a look. At the moment I don't see why we would have to do this > sort of sophisticated handling of continuations in CXF JettyDestination. > With CXF, it's the the code being invoked further down the line (be it SMX > CXF binding components or application code) which needs to worry about doing > either suspending or resuming continuations. > > As far as CXF is concerned, it only needs to be able to associate a given > inbound message with a continuation instance. I reckon saving it as a > continuation user object (preserving the previously set one if any) is a > lighter/simpler alternative than introducing maps in the JettyDestination. > > However, as I said few times earlier in this thread, there's a race > condition which I observe in certain conditions. Specifically, I have a test > where a continuation is resumed virtually immediately after it's been > suspended so before the code dealing with associating this suspended > continuation with the inbound message has a chance to do it, the > continuation.resume() has already occured. In CXF case I believe it can > happen irrespectively of how we write the code dealing with continuations > under the hood. It won't happen if continuation wrappers are used by the > application code. > > Do you have any comments about this race condition ? Or how a code you > linked to can help to avoid it ? > > Cheers, Sergey > > > > >> I would really encourage you to take a look at the smx code for >> handling continuations. >> We've had quite a hard time to handle race conditions, timeouts etc... >> because the continuation has a timeout and when the message is >> received back around the timeout, things can become a bit tricky. >> >> >> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java >> >> We use one concurrent hash map to associate a message id to a >> continuation and multiple synchronization blocks on the continuation >> itself. >> Also the above code can be used with standard servlet servers (i.e. >> when the continuation is a blocking continuation) which is imho a good >> thing. >> >> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin >> <[EMAIL PROTECTED]> wrote: >>> >>> Hi >>> >>>>> >>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see >>>>> a >>>>> loss of message approximately once in 5 cases. The fact that >>>>> cont.resume() >>>>> is done virtually immediately after cont.suspend() can explain it. >>>> >>>> Without seeing your code, I cannot really offer valid suggestions, but >>>> I'll >>>> try.... :-) >>> >>> I guess having it all on a branch would be handy then :-) >>> >>>> >>>> One thought was in the Continuation object, record if "resume()" has >>>> been >>>> called and if it's been callled by the time the stack unwinds back into >>>> the >>>> Http transport, just re-dispatch immediately. Either that or have the >>>> resume block until the http transport sets a "ready to resume" flag just >>>> before it allows the exception to flow back into jetty. >>> >>> I have 2 tests. >>> >>> In one test an application server code interacts with a wrapper, both >>> when >>> getting a continuation instance and when calling suspend/resume on it (as >>> suggested by yourself earlier in this thread). In this case, under the >>> hood, >>> an inbound message is associated with a continuation instance before >>> suspend() is called on it. Thus even if the resulting exception does not >>> reach Jetty Destination in time before continuation.resume() is called by >>> a >>> control thread, the message is not lost when the HTTP request is resumed >>> as >>> that HTTP request had this continuation instance associated with it at a >>> time ContinuationsSupport.getContinuations(request) was called. >>> >>> In other test which I believe represents an integration scenario with SMX >>> better, an application server code calls Jetty >>> ContinuationsSupport.getContinuations(request) followed by >>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest) >>> runtime exception reaches a catch block in AbstractInvoker (where I try >>> to >>> associate a message with continuation), one or two control threads manage >>> to >>> squeeze in and call resume() before catch block has even been processed. >>> So >>> by the time the wrapped exception reaches JettyDestination a request with >>> a >>> resumed continuation has already come back... >>> >>> Does this explanation for a second case and the associated race condition >>> sounds reasonable ? >>> >>> Cheers, Sergey >>> >>> >>> >>> >>> >>>> >>>> >>>> Dan >>>> >>>>> >>>>> Cheers, Sergey >>>>> >>>>> > That said, I'm now trying to inject a message as a custom >>>>> > continuation >>>>> > object (while preserving the original one if any, both ways) as early >>>>> > as >>>>> > possible, in AbstractInvoker, so the time window at which the race >>>>> > condition I talked about earlier can cause the loss of the original >>>>> > message, is extremely small the time it taked for the >>>>> > continuation.suspend() exception to reach a catch block in >>>>> > AbstractInvoker. >>>>> > >>>>> > Cheers, Sergey >>>>> > >>>>> >> Hi, >>>>> >> >>>>> >> I did some system testing with Jetty continuations and it's going >>>>> >> not >>>>> >> too bad. Here's one issue which I've encountered which might or >>>>> >> might >>>>> >> not be a problem in cases where continuations are ustilized directly >>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF >>>>> >> binding >>>>> >> component. >>>>> >> >>>>> >> The problem is that when continuation.suspend(timeout) has been >>>>> >> called, >>>>> >> a resulting RuntimeException might not reach CXF JettyDestination >>>>> >> (such >>>>> >> that the original message with its phase chain can be preserved >>>>> >> until >>>>> >> the request is resumed) if some other application thread calls >>>>> >> continuation.resume() or continuation suspend timeout expires. >>>>> >> >>>>> >> In case of ServiceMix the latter is a theoretical possibility at the >>>>> >> least. I can see in its code this timeout is configured, but if this >>>>> >> timeout is in the region of up to 1 sec or so then it's feasible >>>>> >> that >>>>> >> with a heavy workload the race condition described above might come >>>>> >> to >>>>> >> life. >>>>> >> >>>>> >> That said, as part of my test, I found that even when such condition >>>>> >> occurs, the 'worst' thing which can happen is that a new message and >>>>> >> a >>>>> >> new chain are created, that is, the request is not resumed from a >>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new >>>>> >> request alltogether, but it all works nonetheless, as all the stack >>>>> >> variables used in various interceptors in my given test at least are >>>>> >> all >>>>> >> obtained from a message. The only downside is that that the work >>>>> >> which >>>>> >> has already been done earlier as part of handling the suspended >>>>> >> request >>>>> >> is repeated again by the interceptors. It can cause issues though in >>>>> >> cases when some interceptors have sideeffects as part of handling a >>>>> >> given input request, say modify a db, etc >>>>> >> >>>>> >> Now, this race condition can be safely avoided if a wrapper proposed >>>>> >> by >>>>> >> Dan is used by a server application code as the message can be >>>>> >> preserved >>>>> >> immediately at a point a user calls suspend on our wrapper, so >>>>> >> without >>>>> >> further doubts I've prototyped it too. It's not possible for SMX >>>>> >> components though >>>>> >> >>>>> >> Comments ? >>>>> >> >>>>> >> Cheers, Sergey >>>>> >> >>>>> >>> I guess my thinking was to tie the continutations directly to the >>>>> >>> PhaseInterceptorChain (since that is going to need to know about >>>>> >>> them >>>>> >>> anyway). However, I suppose it could easily be done with a new >>>>> >>> interface. Probably the best thing to do is to stub out a sample >>>>> >>> usecase. So here goes..... >>>>> >>> >>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will >>>>> >>> call >>>>> >>> off asynchrously to some JMS service to actually get the result. >>>>> >>> >>>>> >>> @Resource(name = "jmsClient") >>>>> >>> Greeter jmsGreeter >>>>> >>> @Resource >>>>> >>> WebServiceContext context; >>>>> >>> public String greetMe(String arg) { >>>>> >>> ContinuationSupport contSupport = (ContinuationSupport) >>>>> >>> context.get(ContinuationSupport.class.getName()); >>>>> >>> if (contSupport == null) { >>>>> >>> //continuations not supported, must wait >>>>> >>> return jmsGreeter.greetMe(arg); >>>>> >>> } >>>>> >>> Continuation cont = contSupport.getContinuation(); >>>>> >>> if (cont.isResumed()) { >>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject(); >>>>> >>> return handler.get().getReturn(); >>>>> >>> } else { >>>>> >>> AsyncHandler<GreetMeResponse> handler = new Handler(cont); >>>>> >>> jmsGreeter.greetMeAsync(arg, handler); >>>>> >>> cont.suspend(handler); >>>>> >>> return null; //won't actually get here as suspend will throw a >>>>> >>> ContinuationException >>>>> >>> } >>>>> >>> } >>>>> >>> >>>>> >>> The Handler would look something like: >>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> { >>>>> >>> GreetMeResponse resp; >>>>> >>> Continuation cont; >>>>> >>> public Handler(Continuation cont) { >>>>> >>> this.cont = cont; >>>>> >>> } >>>>> >>> public void handleResponse(Response<GreetMeLaterResponse> >>>>> >>> response) { resp = response.get(); >>>>> >>> cont.resume(); >>>>> >>> } >>>>> >>> } >>>>> >>> >>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation >>>>> >>> of >>>>> >>> ContinuationSupport that wrappers the jetty stuff. JMS could >>>>> >>> provide >>>>> >>> one that's pretty much a null op. Transports that cannot support >>>>> >>> it >>>>> >>> (like servlet) just wouldn't provide an implementation. >>>>> >>> >>>>> >>> >>>>> >>> Does that make sense? Other ideas? >>>>> >>> >>>>> >>> Dan >>>>> >>> >>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote: >>>>> >>>> > No. We don't want that. Whatever we do should work for other >>>>> >>>> > transports as well like JMS. Thus, this shouldn't be tied to >>>>> >>>> > jetty >>>>> >>>> > continuations directly. >>>>> >>>> >>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations. >>>>> >>>> Ex. >>>>> >>>> >>>>> >>>> try { >>>>> >>>> invoke(); // continuation.suspend() somehow by the code being >>>>> >>>> invoked upon } >>>>> >>>> catch (RuntimeException ex) { >>>>> >>>> >>>>> >>>> if >>>>> >>>> >>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException")) >>>>> >>>> throw new SuspendedFault(ex); >>>>> >>>> // or PhaseInterceptorChain.suspend() >>>>> >>>> } >>>>> >>>> } >>>>> >>>> >>>>> >>>> > Most likely, we could add a "suspend()" method to >>>>> >>>> > PhaseInterceptorChain that would do something very similar and >>>>> >>>> > throw >>>>> >>>> > a "SuspendException" or something in the same package as >>>>> >>>> > PhaseInterceptorChain. >>>>> >>>> >>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call >>>>> >>>> though >>>>> >>>> ? >>>>> >>>> >>>>> >>>> > That would get propogated >>>>> >>>> > back to the JettyDestination that could then call the jetty >>>>> >>>> > things. >>>>> >>>> > The JMS transport could just catch it and more or less ignore >>>>> >>>> > it. >>>>> >>>> > We'd then have to add a "resume()" method to the chain which >>>>> >>>> > would >>>>> >>>> > call back onto a listener that the transport provides. Jetty >>>>> >>>> > would >>>>> >>>> > just call the jetty resume stuff. JMS would probably put a >>>>> >>>> > runnable >>>>> >>>> > on the workqueue to restart the chain. >>>>> >>>> >>>>> >>>> ok >>>>> >>>> >>>>> >>>> > Also, suspend() would need to check if there is a listener. If >>>>> >>>> > not, >>>>> >>>> > it should not throw the exception. Thus, the servlet transport >>>>> >>>> > and >>>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore >>>>> >>>> > it. >>>>> >>>> >>>>> >>>> ok, not sure I understand about the listener but I think I see >>>>> >>>> what >>>>> >>>> you mean... >>>>> >>>> >>>>> >>>> > Basically, this needs to be done in such a way that it CAN work >>>>> >>>> > for >>>>> >>>> > the non-jetty cases. However, it also needs to be done in a >>>>> >>>> > way >>>>> >>>> > that doesn't affect existing transports. >>>>> >>>> >>>>> >>>> +1 >>>>> >>>> >>>>> >>>> Cheers, Sergey >>>>> >>>> >>>>> >>>> > Dan >>>>> >>>> > >>>>> >>>> >> 2. Now, if the above can be figured out, the next problem >>>>> >>>> >> arises: >>>>> >>>> >> when the "trigger" to wake up the continuation occurs >>>>> >>>> >> >>>>> >>>> >> I think we can can do in JettyDestination omething similar to >>>>> >>>> >> what >>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can >>>>> >>>> >> extract from it the original continuation instance or else we >>>>> >>>> >> can >>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should >>>>> >>>> >> return >>>>> >>>> >> us the instance. At this point we can use it as a ket to store >>>>> >>>> >> the >>>>> >>>> >> current exchange plus all the other info we may need. >>>>> >>>> >> >>>>> >>>> >> When the user/application code does continuation.resume(), the >>>>> >>>> >> Jetty thread will come back and we will use the >>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the >>>>> >>>> >> active >>>>> >>>> >> continuation and use it to extract the suspended exchange and >>>>> >>>> >> proceed from there, say we'll call >>>>> >>>> >> PhaseInterceptorPhase.resume(), >>>>> >>>> >> etc, something along the lines you suggested >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty >>>>> >>>> >> much >>>>> >>>> >> everything to make sure nothing is stored on the stack and is >>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy. >>>>> >>>> >> >>>>> >>>> >> Yea - probably can be the quite challenging >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> Thoughts ? >>>>> >>>> >> >>>>> >>>> >> Cheers, Sergey >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations >>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835 >>>>> >>>> >> [3] >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126 >>>>> >>>> >>42361 #ac tion_12642361 >>>>> >>>> > >>>>> >>>> > -- >>>>> >>>> > Daniel Kulp >>>>> >>>> > [EMAIL PROTECTED] >>>>> >>>> > http://dankulp.com/blog >>>>> >>> >>>>> >>> -- >>>>> >>> Daniel Kulp >>>>> >>> [EMAIL PROTECTED] >>>>> >>> http://dankulp.com/blog >>>> >>>> >>>> >>>> -- >>>> Daniel Kulp >>>> [EMAIL PROTECTED] >>>> http://dankulp.com/blog >>> >>> >> >> >> >> -- >> Cheers, >> Guillaume Nodet >> ------------------------ >> Blog: http://gnodet.blogspot.com/ >> ------------------------ >> Open Source SOA >> http://fusesource.com > > -- Cheers, Guillaume Nodet ------------------------ Blog: http://gnodet.blogspot.com/ ------------------------ Open Source SOA http://fusesource.com
