Cheers, Sergey
> That said, I'm now trying to inject a message as a custom continuation
> object (while preserving the original one if any, both ways) as early as
> possible, in AbstractInvoker, so the time window at which the race
> condition I talked about earlier can cause the loss of the original
> message, is extremely small the time it taked for the
> continuation.suspend() exception to reach a catch block in
> AbstractInvoker.
>
> Cheers, Sergey
>
>> Hi,
>>
>> I did some system testing with Jetty continuations and it's going not
>> too bad. Here's one issue which I've encountered which might or might
>> not be a problem in cases where continuations are ustilized directly
>> (that is without our wrappers), as in case of say ServiceMix CXF binding
>> component.
>>
>> The problem is that when continuation.suspend(timeout) has been called,
>> a resulting RuntimeException might not reach CXF JettyDestination (such
>> that the original message with its phase chain can be preserved until
>> the request is resumed) if some other application thread calls
>> continuation.resume() or continuation suspend timeout expires.
>>
>> In case of ServiceMix the latter is a theoretical possibility at the
>> least. I can see in its code this timeout is configured, but if this
>> timeout is in the region of up to 1 sec or so then it's feasible that
>> with a heavy workload the race condition described above might come to
>> life.
>>
>> That said, as part of my test, I found that even when such condition
>> occurs, the 'worst' thing which can happen is that a new message and a
>> new chain are created, that is, the request is not resumed from a
>> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>> request alltogether, but it all works nonetheless, as all the stack
>> variables used in various interceptors in my given test at least are all
>> obtained from a message. The only downside is that that the work which
>> has already been done earlier as part of handling the suspended request
>> is repeated again by the interceptors. It can cause issues though in
>> cases when some interceptors have sideeffects as part of handling a
>> given input request, say modify a db, etc
>>
>> Now, this race condition can be safely avoided if a wrapper proposed by
>> Dan is used by a server application code as the message can be preserved
>> immediately at a point a user calls suspend on our wrapper, so without
>> further doubts I've prototyped it too. It's not possible for SMX
>> components though
>>
>> Comments ?
>>
>> Cheers, Sergey
>>
>>> I guess my thinking was to tie the continutations directly to the
>>> PhaseInterceptorChain (since that is going to need to know about them
>>> anyway). However, I suppose it could easily be done with a new
>>> interface. Probably the best thing to do is to stub out a sample
>>> usecase. So here goes.....
>>>
>>> Lets take a "GreetMe" web service that in the greetMe method will call
>>> off asynchrously to some JMS service to actually get the result.
>>>
>>> @Resource(name = "jmsClient")
>>> Greeter jmsGreeter
>>> @Resource
>>> WebServiceContext context;
>>> public String greetMe(String arg) {
>>> ContinuationSupport contSupport = (ContinuationSupport)
>>> context.get(ContinuationSupport.class.getName());
>>> if (contSupport == null) {
>>> //continuations not supported, must wait
>>> return jmsGreeter.greetMe(arg);
>>> }
>>> Continuation cont = contSupport.getContinuation();
>>> if (cont.isResumed()) {
>>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>> return handler.get().getReturn();
>>> } else {
>>> AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>> jmsGreeter.greetMeAsync(arg, handler);
>>> cont.suspend(handler);
>>> return null; //won't actually get here as suspend will throw a
>>> ContinuationException
>>> }
>>> }
>>>
>>> The Handler would look something like:
>>> class Handler implements AsyncHandler<GreetMeResponse> {
>>> GreetMeResponse resp;
>>> Continuation cont;
>>> public Handler(Continuation cont) {
>>> this.cont = cont;
>>> }
>>> public void handleResponse(Response<GreetMeLaterResponse>
>>> response) { resp = response.get();
>>> cont.resume();
>>> }
>>> }
>>>
>>> Basically, the HTTP/Jetty transport could provide an implementation of
>>> ContinuationSupport that wrappers the jetty stuff. JMS could provide
>>> one that's pretty much a null op. Transports that cannot support it
>>> (like servlet) just wouldn't provide an implementation.
>>>
>>>
>>> Does that make sense? Other ideas?
>>>
>>> Dan
>>>
>>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>> > No. We don't want that. Whatever we do should work for other
>>>> > transports as well like JMS. Thus, this shouldn't be tied to jetty
>>>> > continuations directly.
>>>>
>>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>> Ex.
>>>>
>>>> try {
>>>> invoke(); // continuation.suspend() somehow by the code being
>>>> invoked upon }
>>>> catch (RuntimeException ex) {
>>>>
>>>> if
>>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>> throw new SuspendedFault(ex);
>>>> // or PhaseInterceptorChain.suspend()
>>>> }
>>>> }
>>>>
>>>> > Most likely, we could add a "suspend()" method to
>>>> > PhaseInterceptorChain that would do something very similar and throw
>>>> > a "SuspendException" or something in the same package as
>>>> > PhaseInterceptorChain.
>>>>
>>>> When do we trigger this PhaseInterceptorChain.suspend() call though ?
>>>>
>>>> > That would get propogated
>>>> > back to the JettyDestination that could then call the jetty things.
>>>> > The JMS transport could just catch it and more or less ignore it.
>>>> > We'd then have to add a "resume()" method to the chain which would
>>>> > call back onto a listener that the transport provides. Jetty would
>>>> > just call the jetty resume stuff. JMS would probably put a runnable
>>>> > on the workqueue to restart the chain.
>>>>
>>>> ok
>>>>
>>>> > Also, suspend() would need to check if there is a listener. If not,
>>>> > it should not throw the exception. Thus, the servlet transport and
>>>> > CORBA stuff that couldn't do this would pretty much just ignore it.
>>>>
>>>> ok, not sure I understand about the listener but I think I see what
>>>> you mean...
>>>>
>>>> > Basically, this needs to be done in such a way that it CAN work for
>>>> > the non-jetty cases. However, it also needs to be done in a way
>>>> > that doesn't affect existing transports.
>>>>
>>>> +1
>>>>
>>>> Cheers, Sergey
>>>>
>>>> > Dan
>>>> >
>>>> >> 2. Now, if the above can be figured out, the next problem arises:
>>>> >> when the "trigger" to wake up the continuation occurs
>>>> >>
>>>> >> I think we can can do in JettyDestination omething similar to what
>>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>>> >> extract from it the original continuation instance or else we can
>>>> >> do ContinuationSupport.getContinuation(request) which should return
>>>> >> us the instance. At this point we can use it as a ket to store the
>>>> >> current exchange plus all the other info we may need.
>>>> >>
>>>> >> When the user/application code does continuation.resume(), the
>>>> >> Jetty thread will come back and we will use the
>>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>>> >> continuation and use it to extract the suspended exchange and
>>>> >> proceed from there, say we'll call PhaseInterceptorPhase.resume(),
>>>> >> etc, something along the lines you suggested
>>>> >>
>>>> >>
>>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>>> >> everything to make sure nothing is stored on the stack and is
>>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>> >>
>>>> >> Yea - probably can be the quite challenging
>>>> >>
>>>> >>
>>>> >> Thoughts ?
>>>> >>
>>>> >> Cheers, Sergey
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>> >> [3]
>>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>> >>42361 #ac tion_12642361
>>>> >
>>>> > --
>>>> > Daniel Kulp
>>>> > [EMAIL PROTECTED]
>>>> > http://dankulp.com/blog
>>>
>>> --
>>> Daniel Kulp
>>> [EMAIL PROTECTED]
>>> http://dankulp.com/blog