Author: gertv
Date: Tue Jun 7 07:59:21 2011
New Revision: 1132898
URL: http://svn.apache.org/viewvc?rev=1132898&view=rev
Log:
SMXCOMP-880: Refactor servicemix-http component to avoid use of wrapper for
locking
Added:
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
Modified:
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Modified:
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=1132898&r1=1132897&r2=1132898&view=diff
==============================================================================
---
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
(original)
+++
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Tue Jun 7 07:59:21 2011
@@ -55,6 +55,8 @@ import org.mortbay.util.ajax.Continuatio
import org.mortbay.util.ajax.ContinuationSupport;
import org.mortbay.util.ajax.WaitingContinuation;
+import static
org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
+
/**
* Plain HTTP consumer endpoint. This endpoint can be used to handle plain
HTTP request (without SOAP) or to be able to
* process the request in a non standard way. For HTTP requests, a WSDL2 HTTP
binding can be used.
@@ -73,7 +75,8 @@ public class HttpConsumerEndpoint extend
private long timeout; // 0 => default to the timeout configured on
component
private URI defaultMep = JbiConstants.IN_OUT;
private Map<String, Object> resources = new HashMap<String, Object>();
- private Map<String, Continuation> locks = new ConcurrentHashMap<String,
Continuation>();
+ private Map<String, Continuation> continuations = new
ConcurrentHashMap<String, Continuation>();
+ private Map<String, Object> mutexes = new ConcurrentHashMap<String,
Object>();
private Object httpContext;
private boolean started = false;
@@ -224,139 +227,200 @@ public class HttpConsumerEndpoint extend
super.stop();
}
+ /*
+ * Process the reponse message exchange
+ */
public void process(MessageExchange exchange) throws Exception {
- // Receive the exchange response
- // First, check if the continuation has not been removed from the map,
- // which would mean it has timed out. If this is the case, throw an
exception
- // that will set the exchange status to ERROR.
- Continuation cont = locks.get(exchange.getExchangeId());
- if (cont == null) {
- throw new Exception("HTTP request has timed out for exchange: " +
exchange.getExchangeId());
- }
- synchronized (cont) {
- logger.debug("Resuming continuation for exchange: {}",
exchange.getExchangeId());
- // In case of the SEDA flow isn't used, the exchange could be a
different instance, so it should be updated.
- cont.setObject(exchange);
- // Resume continuation
- cont.resume();
- if (!cont.isResumed()) {
- logger.debug("Could not resume continuation for exchange: {}",
exchange.getExchangeId());
- throw new Exception("HTTP request has timed out for exchange:
" + exchange.getExchangeId());
+ final String id = exchange.getExchangeId();
+ final Object mutex = mutexes.get(id);
+
+ // if the mutex is no longer available, the HTTP request timed out
before the message exchange got handled by the ESB
+ if (mutex == null) {
+ handleLateResponse(exchange);
+ return;
+ }
+
+ // Synchronize on the mutex object while we're tinkering with the
continuation object
+ synchronized (mutex) {
+ final Continuation continuation = continuations.get(id);
+ if (continuation != null && continuation.isPending()) {
+ logger.debug("Resuming continuation for exchange: {}",
exchange.getExchangeId());
+
+ // in case of the JMS/JCA flow, you might have a different
instance of the message exchange here
+ continuation.setObject(exchange);
+
+ continuation.resume();
+
+ // if the continuation could no longer be resumed, the HTTP
request might have timed out before the message
+ // exchange got handled by the ESB
+ if (!continuation.isResumed()) {
+ handleLateResponse(exchange);
+ }
+ } else {
+ // it the continuation is no longer available or no longer
pending, the HTTP request has time out before
+ // the message exchange got handled by the ESB
+ handleLateResponse(exchange);
}
}
}
+ /*
+ * Process the HTTP request/response - this method gets invoked:
+ * - when a new HTTP request is received
+ * - when a suspended HTTP request is being resumed
+ * (either because the exchange was received or because the request
timed out)
+ */
public void process(HttpServletRequest request, HttpServletResponse
response) throws Exception {
- logger.debug("Receiving HTTP request: {}", request);
+
MessageExchange exchange = null;
+
try {
// Handle WSDLs, XSDs
if (handleStaticResource(request, response)) {
return;
}
- Continuation cont = createContinuation(request);
+ // configure the timeout
long to = this.timeout;
if (to == 0) {
to = ((HttpComponent)
getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
}
- if (!cont.isPending()) {
- // Check endpoint is started
- if (!started) {
-
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is
stopped");
- return;
- }
- // Create the exchange
- exchange = createExchange(request);
- // Put the exchange into the continuation for retrieval later.
- cont.setObject(exchange);
- // Put the continuation in a map under the exchange id key
- locks.put(exchange.getExchangeId(), cont);
- logger.debug("Suspending continuation for exchange: {}",
exchange.getExchangeId());
- synchronized (cont) {
- // Send the exchange and then suspend the request.
- send(exchange);
- // Suspend the continuation for the configured timeout
- // If a SelectConnector is used, the call to suspend
- // will throw a RetryRequest exception
- // else, the call will block until the continuation is
- // resumed
- boolean istimeout = !cont.suspend(to);
- // The call has not thrown a RetryRequest, which means
- // we don't use a SelectConnector
- // and we must handle the exchange in this very method
- // call.
- // If result is false, the continuation has timed out.
- locks.remove(exchange.getExchangeId());
-
- // Timeout if SelectConnector is not used
- if (istimeout) {
- throw new Exception("HTTP request has timed out for
exchange: " + exchange.getExchangeId());
+ final Continuation continuation =
ContinuationSupport.getContinuation(request, null);
+ exchange = (MessageExchange) continuation.getObject();
+ final Object mutex = getOrCreateMutex(exchange);
+
+ // Synchronize on the mutex object while we're tinkering with the
continuation object
+ synchronized (mutex) {
+ if (isNewContinuation(continuation)) {
+ logger.debug("Receiving HTTP request: {}", request);
+
+ // send back HTTP status 503 (Not Avaialble) to reject any
new requests if the endpoint is not started
+ if (!started) {
+
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint is
stopped");
+ return;
}
+
+ // Create the exchange
+ exchange = createExchange(request);
+ final String id = exchange.getExchangeId();
+
+ // Put the exchange into the continuation for later
retrieval and store the mutex/continuation objects
+ continuation.setObject(exchange);
+ mutexes.put(id, mutex);
+ continuations.put(id, continuation);
+
+ // Send the exchange and then suspend the HTTP request
until the message exchange gets answered
+ send(exchange);
+
+ // Right after this if-block, we will try suspending the
continuation
+ // If a SelectConnector is being used, the call to suspend
will throw a RetryRequest
+ // This will free the thread - this method will be invoked
again when the continuation gets resumed
+ logger.debug("Suspending continuation for exchange: {}",
exchange.getExchangeId());
+ } else {
+ logger.debug("Resuming HTTP request: {}", request);
}
- } else {
- // The continuation is a retry.
- // This happens when the SelectConnector is used and in two
cases:
- // * the continuation has been resumed because the exchange
has been received
- // * the continuation has timed out
- boolean istimeout = !cont.suspend(to);
- exchange = (MessageExchange) cont.getObject();
- // Remove the continuation from the map, indicating it has
been processed or timed out
- locks.remove(exchange.getExchangeId());
- // Timeout
+ boolean istimeout = !continuation.suspend(to);
+
+ // Continuation is being ended (either because the message
exchange was handled or the continuation timed out)
+ // Cleaning up the stored objects for this continuation now
+ exchange = doEndContinuation(continuation);
+
+ // Timeout if SelectConnector is not used
if (istimeout) {
throw new Exception("HTTP request has timed out for
exchange: " + exchange.getExchangeId());
}
}
- // At this point, we have received the exchange response,
- // so process it and send back the HTTP response
- if (exchange.getStatus() == ExchangeStatus.ERROR) {
- Exception e = exchange.getError();
- if (e == null) {
- e = new Exception("Unkown error (exchange aborted ?)");
- }
- throw e;
- } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- try {
- Fault fault = exchange.getFault();
- if (fault != null) {
- sendFault(exchange, fault, request, response);
- } else {
- NormalizedMessage outMsg = exchange.getMessage("out");
- if (outMsg != null) {
- sendOut(exchange, outMsg, request, response);
- }
- }
- done(exchange);
- } catch (Exception e) {
- fail(exchange, e);
- throw e;
- }
- } else if (exchange.getStatus() == ExchangeStatus.DONE) {
- // This happens when there is no response to send back
- sendAccepted(exchange, request, response);
- }
+
+ // message exchange has been completed, so we're ready to send
back an HTTP response now
+ handleResponse(exchange, request, response);
} catch (RetryRequest e) {
+ // retrow the RetryRequest to allow Jetty to re-invoke this method
when the continuation is being resumed
throw e;
} catch (Exception e) {
sendError(exchange, e, request, response);
}
}
- private Continuation createContinuation(HttpServletRequest request) {
- // not giving a specific mutex will synchronize on the continuation
itself
- Continuation continuation =
ContinuationSupport.getContinuation(request, null);
- if (continuation instanceof WaitingContinuation) {
- return continuation;
- } else {
- // wrap the continuation to avoid a deadlock between this endpoint
and the Jetty continuation timeout mechanism
- // the endpoint now synchronizes on the wrapper while Jetty
synchronizes on the continuation itself
- return new ContinuationWrapper(continuation);
+ /*
+ * Handle the HTTP response based on the information in the message
exchange we received
+ */
+ private void handleResponse(MessageExchange exchange, HttpServletRequest
request, HttpServletResponse response) throws Exception {
+ // At this point, we have received the exchange response,
+ // so process it and send back the HTTP response
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ Exception e = exchange.getError();
+ if (e == null) {
+ e = new Exception("Unkown error (exchange aborted ?)");
+ }
+ throw e;
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ try {
+ Fault fault = exchange.getFault();
+ if (fault != null) {
+ sendFault(exchange, fault, request, response);
+ } else {
+ NormalizedMessage outMsg = exchange.getMessage("out");
+ if (outMsg != null) {
+ sendOut(exchange, outMsg, request, response);
+ }
+ }
+ done(exchange);
+ } catch (Exception e) {
+ fail(exchange, e);
+ throw e;
+ }
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // This happens when there is no response to send back
+ sendAccepted(exchange, request, response);
}
}
+ /*
+ * Handle a message exchange that is being received after the
corresponding HTTP request has timed out
+ */
+ private void handleLateResponse(MessageExchange exchange) throws Exception
{
+ throw new Exception("HTTP request has timed out for exchange: " +
exchange.getExchangeId());
+
+ // TODO: allow multiple options for handling late response from the ESB
+ // - by throwing an exception to make the exchange end in error
+ // - by logging a warning (make sure MEP gets handled appropriately
here!)
+ }
+
+ /*
+ * Get or create an object that can be used for synchronizing code blocks
for a given exchange
+ */
+ private Object getOrCreateMutex(MessageExchange exchange) {
+ Object result = null;
+
+ // let's try to find the object that corresponds to the exchange first
+ if (exchange != null) {
+ result = mutexes.get(exchange.getExchangeId());
+ }
+
+ // no luck finding an existing object, let's create a new one
+ if (result == null) {
+ result = new Object();
+ }
+
+ return result;
+ }
+
+ /*
+ * End the continuation by removing all objects stored on/for the
continuation
+ * and returning the MessageExchange that was represented by the
continuation
+ */
+ private MessageExchange doEndContinuation(Continuation continuation) {
+ final MessageExchange exchange = (MessageExchange)
continuation.getObject();
+ final String id = exchange.getExchangeId();
+
+ continuation.setObject(null);
+ mutexes.remove(id);
+ continuations.remove(id);
+ return exchange;
+ }
+
protected void loadStaticResources() throws Exception {
}
@@ -472,49 +536,4 @@ public class HttpConsumerEndpoint extend
((DefaultHttpConsumerMarshaler)
marshaler).setDefaultMep(getDefaultMep());
}
}
-
- /*
- * Continuation wrapper just delegates everything to the underlying
Continuation
- */
- private static final class ContinuationWrapper implements Continuation {
-
- private final Continuation continuation;
-
- private ContinuationWrapper(Continuation continuation) {
- super();
- this.continuation = continuation;
- }
-
- public Object getObject() {
- return continuation.getObject();
- }
-
- public boolean isNew() {
- return continuation.isNew();
- }
-
- public boolean isPending() {
- return continuation.isPending();
- }
-
- public boolean isResumed() {
- return continuation.isResumed();
- }
-
- public void reset() {
- continuation.reset();
- }
-
- public void resume() {
- continuation.resume();
- }
-
- public void setObject(Object o) {
- continuation.setObject(o);
- }
-
- public boolean suspend(long timeout) {
- return continuation.suspend(timeout);
- }
- }
}
Added:
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
URL:
http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java?rev=1132898&view=auto
==============================================================================
---
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
(added)
+++
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
Tue Jun 7 07:59:21 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.jetty;
+
+import org.mortbay.util.ajax.Continuation;
+
+/**
+ * A few helper methods for working with Jetty continuations
+ */
+public class ContinuationHelper {
+
+ private ContinuationHelper() {
+ // static helper methods only - no need for a public constructor
+ }
+
+ /**
+ * Is this a new continuation object?
+ *
+ * @param continuation
+ * @return <code>true</code> for a new continuation object
+ */
+ public static boolean isNewContinuation(Continuation continuation) {
+ return continuation.isNew() ||
+ (!continuation.isPending() && !continuation.isResumed());
+ }
+}
Added:
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
URL:
http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java?rev=1132898&view=auto
==============================================================================
---
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
(added)
+++
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
Tue Jun 7 07:59:21 2011
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.jetty;
+
+import junit.framework.TestCase;
+import org.mortbay.util.ajax.Continuation;
+
+import static
org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
+
+/**
+ * Test case for {@link ContinuationHelper}
+ */
+public class ContinuationHelperTest extends TestCase {
+
+
+ public void testIsNewContinuation() {
+ assertTrue(isNewContinuation(new MockContinuation(true, null, null)));
+ assertTrue(isNewContinuation(new MockContinuation(false, false,
false)));
+
+ assertFalse(isNewContinuation(new MockContinuation(false, true,
false)));
+ assertFalse(isNewContinuation(new MockContinuation(false, false,
true)));
+ }
+
+ protected static final class MockContinuation implements Continuation {
+
+ private boolean isNew;
+ private Boolean pending;
+ private Boolean resumed;
+
+ protected MockContinuation(boolean isNew, Boolean pending, Boolean
resumed) {
+ super();
+ this.isNew = isNew;
+ this.pending = pending;
+ this.resumed = resumed;
+ }
+
+ public boolean suspend(long l) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ public void resume() {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ public boolean isNew() {
+ return isNew;
+ }
+
+ public boolean isPending() {
+ return pending;
+ }
+
+ public boolean isResumed() {
+ return resumed;
+ }
+
+ public Object getObject() {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+
+ public void setObject(Object o) {
+ throw new UnsupportedOperationException("Not yet supported");
+ }
+ }
+}