You need to associate the continuation with the message before the
suspend() method is called, so that whenever the message is ready, it
will work.
Also, I think you will have to care about timeouts ...
Another thing: it would be nice if you could create a branch and
commit your ongoing work there so that we can have something more
tangible to discuss on ... ;-)  We may has well just drop it later, it
does not really matter.

On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin
<[EMAIL PROTECTED]> wrote:
> Hi,
>
> I have had a look. At the moment I don't see why we would have to do this
> sort of sophisticated handling of continuations in CXF JettyDestination.
> With CXF, it's the the code being invoked further down the line (be it SMX
> CXF binding components or application code) which needs to worry about doing
> either suspending or resuming continuations.
>
> As far as CXF is concerned, it only needs to be able to associate a given
> inbound message with a continuation instance. I reckon saving it as a
> continuation user object (preserving the previously set one if any) is a
> lighter/simpler alternative than introducing maps in the JettyDestination.
>
> However, as I said few times earlier in this thread, there's a race
> condition which I observe in certain conditions. Specifically, I have a test
> where a continuation is resumed virtually immediately after it's been
> suspended so before the code dealing with associating this suspended
> continuation with the inbound message has a chance to do it, the
> continuation.resume() has already occured. In CXF case I believe it can
> happen irrespectively of how we write the code dealing with continuations
> under the hood. It won't happen if continuation wrappers are used by the
> application code.
>
> Do you have any comments about this race condition ? Or how a code you
> linked to can help to avoid it ?
>
> Cheers, Sergey
>
>
>
>
>> I would really encourage you to take a look at the smx code for
>> handling continuations.
>> We've had quite a hard time to handle race conditions, timeouts etc...
>> because the continuation has a timeout and when the message is
>> received back around the timeout, things can become a bit tricky.
>>
>>
>> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>>
>> We use one concurrent hash map to associate a message id to a
>> continuation and multiple synchronization blocks on the continuation
>> itself.
>> Also the above code can be used with standard servlet servers (i.e.
>> when the continuation is a blocking continuation) which is imho a good
>> thing.
>>
>> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
>> <[EMAIL PROTECTED]> wrote:
>>>
>>> Hi
>>>
>>>>>
>>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see
>>>>> a
>>>>> loss of message approximately once in 5 cases. The fact that
>>>>> cont.resume()
>>>>> is done virtually immediately after cont.suspend() can explain it.
>>>>
>>>> Without seeing your code, I cannot really offer valid suggestions, but
>>>> I'll
>>>> try....   :-)
>>>
>>> I guess having it all on a branch would be handy then :-)
>>>
>>>>
>>>> One thought was in the Continuation object, record if "resume()" has
>>>> been
>>>> called and if it's been callled by the time the stack unwinds back into
>>>> the
>>>> Http transport, just re-dispatch immediately.   Either that or have the
>>>> resume block until the http transport sets a "ready to resume" flag just
>>>> before it allows the exception to flow back into jetty.
>>>
>>> I have 2 tests.
>>>
>>> In one test an application server code interacts with a wrapper, both
>>> when
>>> getting a continuation instance and when calling suspend/resume on it (as
>>> suggested by yourself earlier in this thread). In this case, under the
>>> hood,
>>> an inbound message is associated with a continuation instance before
>>> suspend() is called on it. Thus even if the resulting exception does not
>>> reach Jetty Destination in time before continuation.resume() is called by
>>> a
>>> control thread, the message is not lost when the HTTP request is resumed
>>> as
>>> that HTTP request had this continuation instance associated with it at a
>>> time ContinuationsSupport.getContinuations(request) was called.
>>>
>>> In other test which I believe represents an integration scenario with SMX
>>> better, an application server code calls Jetty
>>> ContinuationsSupport.getContinuations(request) followed by
>>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>>> runtime exception reaches a catch block in AbstractInvoker (where I try
>>> to
>>> associate a message with continuation), one or two control threads manage
>>> to
>>> squeeze in and call resume() before catch block has even been processed.
>>> So
>>> by the time the wrapped exception reaches JettyDestination a request with
>>> a
>>> resumed continuation has already come back...
>>>
>>> Does this explanation for a second case and the associated race condition
>>> sounds reasonable ?
>>>
>>> Cheers, Sergey
>>>
>>>
>>>
>>>
>>>
>>>>
>>>>
>>>> Dan
>>>>
>>>>>
>>>>> Cheers, Sergey
>>>>>
>>>>> > That said, I'm now trying to inject a message as a custom
>>>>> > continuation
>>>>> > object (while preserving the original one if any, both ways) as early
>>>>> > as
>>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>>> > condition I talked about earlier can cause the loss of the original
>>>>> > message, is extremely small the time it taked for the
>>>>> > continuation.suspend() exception to reach a catch block in
>>>>> > AbstractInvoker.
>>>>> >
>>>>> > Cheers, Sergey
>>>>> >
>>>>> >> Hi,
>>>>> >>
>>>>> >> I did some system testing with Jetty continuations and it's going
>>>>> >> not
>>>>> >> too bad. Here's one issue which I've encountered which might or
>>>>> >> might
>>>>> >> not be a problem in cases where continuations are ustilized directly
>>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>>> >> binding
>>>>> >> component.
>>>>> >>
>>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>>> >> called,
>>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>>> >> (such
>>>>> >> that the original message with its phase chain can be preserved
>>>>> >> until
>>>>> >> the request is resumed) if some other application thread calls
>>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>>> >>
>>>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>>>> >> least. I can see in its code this timeout is configured, but if this
>>>>> >> timeout is in the region of up to 1 sec or so then it's feasible
>>>>> >> that
>>>>> >> with a heavy  workload the race condition described above might come
>>>>> >> to
>>>>> >> life.
>>>>> >>
>>>>> >> That said, as part of my test, I found that even when such condition
>>>>> >> occurs, the 'worst' thing which can happen is that a new message and
>>>>> >> a
>>>>> >> new chain are created, that is, the request is not resumed from a
>>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>>>> >> request alltogether, but it all works nonetheless, as all the stack
>>>>> >> variables used in various interceptors in my given test at least are
>>>>> >> all
>>>>> >> obtained from a message. The only downside is that that the work
>>>>> >> which
>>>>> >> has already been done earlier as part of handling the suspended
>>>>> >> request
>>>>> >> is repeated again by the interceptors. It can cause issues though in
>>>>> >> cases when some interceptors have sideeffects as part of handling a
>>>>> >> given input request, say modify a db, etc
>>>>> >>
>>>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>>>> >> by
>>>>> >> Dan is used by a server application code as the message can be
>>>>> >> preserved
>>>>> >> immediately at a point a user calls suspend on our wrapper, so
>>>>> >> without
>>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>>> >> components though
>>>>> >>
>>>>> >> Comments ?
>>>>> >>
>>>>> >> Cheers, Sergey
>>>>> >>
>>>>> >>> I guess my thinking was to tie the continutations directly to the
>>>>> >>> PhaseInterceptorChain (since that is going to need to know about
>>>>> >>> them
>>>>> >>> anyway).   However, I suppose it could easily be done with a new
>>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>>> >>> usecase.   So here goes.....
>>>>> >>>
>>>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>>>> >>> call
>>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>>> >>>
>>>>> >>> @Resource(name = "jmsClient")
>>>>> >>> Greeter jmsGreeter
>>>>> >>> @Resource
>>>>> >>> WebServiceContext context;
>>>>> >>> public String greetMe(String arg) {
>>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>>> >>>     if (contSupport == null) {
>>>>> >>>          //continuations not supported, must wait
>>>>> >>>          return jmsGreeter.greetMe(arg);
>>>>> >>>     }
>>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>>> >>>     if (cont.isResumed()) {
>>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>>> >>>        return handler.get().getReturn();
>>>>> >>>     } else {
>>>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>>> >>>         cont.suspend(handler);
>>>>> >>> return null;   //won't actually get here as suspend will throw a
>>>>> >>> ContinuationException
>>>>> >>>     }
>>>>> >>> }
>>>>> >>>
>>>>> >>> The Handler would look something like:
>>>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>>>> >>> GreetMeResponse resp;
>>>>> >>>        Continuation cont;
>>>>> >>> public Handler(Continuation cont) {
>>>>> >>>            this.cont = cont;
>>>>> >>>        }
>>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>>> >>> response) { resp = response.get();
>>>>> >>>              cont.resume();
>>>>> >>>       }
>>>>> >>> }
>>>>> >>>
>>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>>>> >>> of
>>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>>> >>> provide
>>>>> >>> one that's pretty much a null op.   Transports that cannot support
>>>>> >>> it
>>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>>> >>>
>>>>> >>>
>>>>> >>> Does that make sense?   Other ideas?
>>>>> >>>
>>>>> >>> Dan
>>>>> >>>
>>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>>>> >>>> > jetty
>>>>> >>>> > continuations directly.
>>>>> >>>>
>>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>>> >>>> Ex.
>>>>> >>>>
>>>>> >>>> try {
>>>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>>>> >>>> invoked upon }
>>>>> >>>> catch (RuntimeException ex) {
>>>>> >>>>
>>>>> >>>> if
>>>>> >>>>
>>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>> >>>> throw new SuspendedFault(ex);
>>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>>> >>>> }
>>>>> >>>> }
>>>>> >>>>
>>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>>>> >>>> > throw
>>>>> >>>> > a "SuspendException" or something in the same package as
>>>>> >>>> > PhaseInterceptorChain.
>>>>> >>>>
>>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call
>>>>> >>>> though
>>>>> >>>> ?
>>>>> >>>>
>>>>> >>>> >   That would get propogated
>>>>> >>>> > back to the JettyDestination that could then call the jetty
>>>>> >>>> > things.
>>>>> >>>> >  The JMS transport could just catch it and more or less ignore
>>>>> >>>> > it.
>>>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>>>> >>>> > would
>>>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>>>> >>>> > would
>>>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>>>> >>>> > runnable
>>>>> >>>> > on the workqueue to restart the chain.
>>>>> >>>>
>>>>> >>>> ok
>>>>> >>>>
>>>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>>>> >>>> > not,
>>>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>>>> >>>> > and
>>>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>>>> >>>> > it.
>>>>> >>>>
>>>>> >>>> ok, not sure I understand about the listener but I think I see
>>>>> >>>> what
>>>>> >>>> you mean...
>>>>> >>>>
>>>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>>>> >>>> > for
>>>>> >>>> > the non-jetty cases.   However, it also needs to be done in a
>>>>> >>>> > way
>>>>> >>>> > that doesn't affect existing transports.
>>>>> >>>>
>>>>> >>>> +1
>>>>> >>>>
>>>>> >>>> Cheers, Sergey
>>>>> >>>>
>>>>> >>>> > Dan
>>>>> >>>> >
>>>>> >>>> >> 2. Now, if the above can be figured out, the next problem
>>>>> >>>> >> arises:
>>>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>>>> >>>> >>
>>>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>>>> >>>> >> what
>>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>>>> >>>> >> extract from it the original continuation instance or else we
>>>>> >>>> >> can
>>>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>>>> >>>> >> return
>>>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>>>> >>>> >> the
>>>>> >>>> >> current exchange plus all the other info we may need.
>>>>> >>>> >>
>>>>> >>>> >> When the user/application code does continuation.resume(), the
>>>>> >>>> >> Jetty thread will come back and we will use the
>>>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the
>>>>> >>>> >> active
>>>>> >>>> >> continuation and use it to extract the suspended exchange and
>>>>> >>>> >> proceed from there, say we'll call
>>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>>> >>>> >> etc, something along the lines you suggested
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
>>>>> >>>> >> much
>>>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>>>> >>>> >>
>>>>> >>>> >> Yea - probably can be the quite challenging
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> Thoughts ?
>>>>> >>>> >>
>>>>> >>>> >> Cheers, Sergey
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>>> >>>> >> [3]
>>>>> >>>> >>
>>>>> >>>> >>
>>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>>> >>>> >>42361 #ac tion_12642361
>>>>> >>>> >
>>>>> >>>> > --
>>>>> >>>> > Daniel Kulp
>>>>> >>>> > [EMAIL PROTECTED]
>>>>> >>>> > http://dankulp.com/blog
>>>>> >>>
>>>>> >>> --
>>>>> >>> Daniel Kulp
>>>>> >>> [EMAIL PROTECTED]
>>>>> >>> http://dankulp.com/blog
>>>>
>>>>
>>>>
>>>> --
>>>> Daniel Kulp
>>>> [EMAIL PROTECTED]
>>>> http://dankulp.com/blog
>>>
>>>
>>
>>
>>
>> --
>> Cheers,
>> Guillaume Nodet
>> ------------------------
>> Blog: http://gnodet.blogspot.com/
>> ------------------------
>> Open Source SOA
>> http://fusesource.com
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Reply via email to