Sure, will commit soon.
Thanks
Freeman
Guillaume Nodet wrote:
Freeman, I think this modification should be done on the
ConsumerEndpoint too in servicemix-http...

On Tue, Mar 31, 2009 at 04:13,  <[email protected]> wrote:
Author: ffang
Date: Tue Mar 31 02:13:00 2009
New Revision: 760291

URL: http://svn.apache.org/viewvc?rev=760291&view=rev
Log:
[SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc

Modified:
   
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Modified: 
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: 
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
==============================================================================
--- 
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
 (original)
+++ 
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
 Tue Mar 31 02:13:00 2009
@@ -71,6 +71,7 @@
    protected Map<String, MessageExchange> exchanges;
    protected int suspentionTime = 60000;
    protected boolean started = false;
+    private boolean isSTFlow;

    public ConsumerProcessor(HttpEndpoint endpoint) {
        super(endpoint);
@@ -96,21 +97,27 @@
        if (cont == null) {
            throw new Exception("HTTP request has timed out");
        }
-        synchronized (cont) {
-            if (locks.remove(exchange.getExchangeId()) == null) {
-                throw new Exception("HTTP request has timed out");
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Resuming continuation for exchange: " + 
exchange.getExchangeId());
-            }
-            exchanges.put(exchange.getExchangeId(), exchange);
-            cont.resume();
-            if (!cont.isResumed()) {
+
+        if (!cont.isPending()) {
+            isSTFlow = true;
+        } else {
+            isSTFlow = false;
+            synchronized (cont) {
+                if (locks.remove(exchange.getExchangeId()) == null) {
+                    throw new Exception("HTTP request has timed out");
+                }
                if (log.isDebugEnabled()) {
-                    log.debug("Could not resume continuation for exchange: " + 
exchange.getExchangeId());
+                    log.debug("Resuming continuation for exchange: " + 
exchange.getExchangeId());
+                }
+                exchanges.put(exchange.getExchangeId(), exchange);
+                cont.resume();
+                if (!cont.isResumed()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Could not resume continuation for exchange: 
" + exchange.getExchangeId());
+                    }
+                    exchanges.remove(exchange.getExchangeId());
+                    throw new Exception("HTTP request has timed out for exchange: 
" + exchange.getExchangeId());
                }
-                exchanges.remove(exchange.getExchangeId());
-                throw new Exception("HTTP request has timed out for exchange: 
" + exchange.getExchangeId());
            }
        }
    }
@@ -168,15 +175,22 @@
                request.setAttribute(MessageExchange.class.getName(), 
exchange.getExchangeId());
                synchronized (cont) {
                    channel.send(exchange);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Suspending continuation for exchange: " + 
exchange.getExchangeId());
-                    }
-                    boolean result = cont.suspend(suspentionTime);
-                    exchange = exchanges.remove(exchange.getExchangeId());
-                    request.removeAttribute(MessageExchange.class.getName());
-                    if (!result) {
-                        locks.remove(exchange.getExchangeId());
-                        throw new Exception("Exchange timed out");
+                    if (!isSTFlow) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Suspending continuation for exchange: " 
+ exchange.getExchangeId());
+                        }
+                        boolean result = cont.suspend(suspentionTime);
+                        exchange = exchanges.remove(exchange.getExchangeId());
+                        
request.removeAttribute(MessageExchange.class.getName());
+                        if (!result) {
+                            locks.remove(exchange.getExchangeId());
+                            throw new Exception("Exchange timed out");
+                        }
+                    } else {
+                        String id = (String) 
request.getAttribute(MessageExchange.class.getName());
+                        locks.remove(id);
+                        exchange = exchanges.remove(id);
+                        
request.removeAttribute(MessageExchange.class.getName());
                    }
                }
            } catch (RetryRequest retry) {







Reply via email to