Updated Branches: refs/heads/trunk e56c062f2 -> a2ede974b
https://issues.apache.org/jira/browse/AMQ-4938 apply patch with cleanups Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2ede974 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2ede974 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2ede974 Branch: refs/heads/trunk Commit: a2ede974b95261885404a02f32a918dbd9213f51 Parents: e56c062 Author: Timothy Bish <[email protected]> Authored: Thu Jan 16 10:19:10 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Thu Jan 16 10:19:41 2014 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/web/MessageServlet.java | 199 +++++++++++++------ 1 file changed, 134 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a2ede974/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java ---------------------------------------------------------------------- diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 3c0dad2..63056e9 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import javax.jms.Destination; import javax.jms.JMSException; @@ -42,11 +43,18 @@ import org.slf4j.LoggerFactory; /** * A servlet for sending and receiving messages to/from JMS destinations using - * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the - * destination and whether it is a topic or queue via configuration details on - * the servlet or as request parameters. <p/> For reading messages you can - * specify a readTimeout parameter to determine how long the servlet should - * block for. + * HTTP POST for sending and HTTP GET for receiving. + * <p/> + * You can specify the destination and whether it is a topic or queue via + * configuration details on the servlet or as request parameters. + * <p/> + * For reading messages you can specify a readTimeout parameter to determine how + * long the servlet should block for. + * + * One thing to keep in mind with this solution - due to the nature of REST, + * there will always be a chance of losing messages. Consider what happens when + * a message is retrieved from the broker but the web call is interrupted before + * the client receives the message in the response - the message is lost. */ public class MessageServlet extends MessageServletSupport { @@ -59,12 +67,15 @@ public class MessageServlet extends MessageServletSupport { private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class); private final String readTimeoutParameter = "readTimeout"; + private final String readTimeoutRequestAtt = "xamqReadDeadline"; + private final String oneShotParameter = "oneShot"; private long defaultReadTimeout = -1; private long maximumReadTimeout = 20000; private long requestTimeout = 1000; private String defaultContentType = "application/xml"; private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>(); + private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>(); @Override public void init() throws ServletException { @@ -144,7 +155,7 @@ public class MessageServlet extends MessageServletSupport { } /** - * Supports a HTTP DELETE to be equivlanent of consuming a singe message + * Supports a HTTP DELETE to be equivalent of consuming a singe message * from a queue */ @Override @@ -153,7 +164,7 @@ public class MessageServlet extends MessageServletSupport { } /** - * Supports a HTTP DELETE to be equivlanent of consuming a singe message + * Supports a HTTP DELETE to be equivalent of consuming a singe message * from a queue */ @Override @@ -170,69 +181,115 @@ public class MessageServlet extends MessageServletSupport { * @throws IOException */ protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + MessageAvailableConsumer consumer = null; + try { WebClient client = getWebClient(request); Destination destination = getDestination(client, request); if (destination == null) { throw new NoDestinationSuppliedException(); } - MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); + consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName)); + Continuation continuation = ContinuationSupport.getContinuation(request); + + // Don't allow concurrent use of the consumer. Do make sure to allow + // subsequent calls on continuation to use the consumer. + if (continuation.isInitial()) { + synchronized (activeConsumers) { + if (activeConsumers.contains(consumer)) { + throw new ServletException("Concurrent access to consumer is not supported"); + } else { + activeConsumers.add(consumer); + } + } + } + Message message = null; - message = (Message)request.getAttribute("message"); - if (message != null) { - // we're resuming continuation, - // so just write the message and return - writeResponse(request, response, message); - return; + + long deadline = getReadDeadline(request); + long timeout = deadline - System.currentTimeMillis(); + + // Set the message available listener *before* calling receive to eliminate any + // chance of a missed notification between the time receive() completes without + // a message and the time the listener is set. + synchronized (consumer) { + Listener listener = (Listener) consumer.getAvailableListener(); + if (listener == null) { + listener = new Listener(consumer); + consumer.setAvailableListener(listener); + } } - long timeout = getReadTimeout(request); if (LOG.isDebugEnabled()) { LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); } - Continuation continuation = null; - Listener listener = null; - - // Look for any available messages (need a little timeout) - message = consumer.receive(10); + // Look for any available messages (need a little timeout). Always + // try at least one lookup; don't block past the deadline. + if (timeout <= 0) { + message = consumer.receiveNoWait(); + } else if (timeout < 10) { + message = consumer.receive(timeout); + } else { + message = consumer.receive(10); + } - // Get an existing Continuation or create a new one if there are - // no events. if (message == null) { - continuation = ContinuationSupport.getContinuation(request); + handleContinuation(request, response, client, destination, consumer, deadline); + } else { + writeResponse(request, response, message); + closeConsumerOnOneShot(request, client, destination); - if (continuation.isExpired()) { - response.setStatus(HttpServletResponse.SC_NO_CONTENT); - return; + synchronized (activeConsumers) { + activeConsumers.remove(consumer); } + } + } catch (JMSException e) { + throw new ServletException("Could not post JMS message: " + e, e); + } + } - continuation.setTimeout(timeout); - continuation.suspend(); + protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination, + MessageAvailableConsumer consumer, long deadline) { + // Get an existing Continuation or create a new one if there are no events. + Continuation continuation = ContinuationSupport.getContinuation(request); - // Fetch the listeners - listener = (Listener)consumer.getAvailableListener(); - if (listener == null) { - listener = new Listener(consumer); - consumer.setAvailableListener(listener); + long timeout = deadline - System.currentTimeMillis(); + if ((continuation.isExpired()) || (timeout <= 0)) { + // Reset the continuation on the available listener for the consumer to prevent the + // next message receipt from being consumed without a valid, active continuation. + synchronized (consumer) { + Object obj = consumer.getAvailableListener(); + if (obj instanceof Listener) { + ((Listener) obj).setContinuation(null); } - - // register this continuation with our listener. - listener.setContinuation(continuation); } + response.setStatus(HttpServletResponse.SC_NO_CONTENT); + closeConsumerOnOneShot(request, client, destination); + synchronized (activeConsumers) { + activeConsumers.remove(consumer); + } + return; + } - writeResponse(request, response, message); - } catch (JMSException e) { - throw new ServletException("Could not post JMS message: " + e, e); + continuation.setTimeout(timeout); + continuation.suspend(); + + synchronized (consumer) { + Listener listener = (Listener) consumer.getAvailableListener(); + + // register this continuation with our listener. + listener.setContinuation(continuation); } } protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException { int messages = 0; try { - response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP 1.1 - response.setHeader("Pragma", "no-cache"); // HTTP 1.0 - response.setDateHeader("Expires", 0); + response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP + // 1.1 + response.setHeader("Pragma", "no-cache"); // HTTP 1.0 + response.setDateHeader("Expires", 0); // write a responds PrintWriter writer = response.getWriter(); @@ -269,7 +326,7 @@ public class MessageServlet extends MessageServletSupport { protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException { if (message instanceof TextMessage) { - TextMessage textMsg = (TextMessage)message; + TextMessage textMsg = (TextMessage) message; String txt = textMsg.getText(); if (txt != null) { if (txt.startsWith("<?")) { @@ -278,7 +335,7 @@ public class MessageServlet extends MessageServletSupport { writer.print(txt); } } else if (message instanceof ObjectMessage) { - ObjectMessage objectMsg = (ObjectMessage)message; + ObjectMessage objectMsg = (ObjectMessage) message; Object object = objectMsg.getObject(); if (object != null) { writer.print(object.toString()); @@ -288,7 +345,7 @@ public class MessageServlet extends MessageServletSupport { protected boolean isXmlContent(Message message) throws JMSException { if (message instanceof TextMessage) { - TextMessage textMsg = (TextMessage)message; + TextMessage textMsg = (TextMessage) message; String txt = textMsg.getText(); if (txt != null) { // assume its xml when it starts with < @@ -304,7 +361,7 @@ public class MessageServlet extends MessageServletSupport { public WebClient getWebClient(HttpServletRequest request) { String clientId = request.getParameter("clientId"); if (clientId != null) { - synchronized(this) { + synchronized (this) { LOG.debug("Getting local client [" + clientId + "]"); WebClient client = clients.get(clientId); if (client == null) { @@ -338,9 +395,9 @@ public class MessageServlet extends MessageServletSupport { response.setHeader("id", message.getJMSMessageID()); // Return JMS properties as header values. - for(Enumeration names = message.getPropertyNames(); names.hasMoreElements();) { + for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) { String name = (String) names.nextElement(); - response.setHeader(name , message.getObjectProperty(name).toString()); + response.setHeader(name, message.getObjectProperty(name).toString()); } } @@ -348,17 +405,37 @@ public class MessageServlet extends MessageServletSupport { * @return the timeout value for read requests which is always >= 0 and <= * maximumReadTimeout to avoid DoS attacks */ - protected long getReadTimeout(HttpServletRequest request) { - long answer = defaultReadTimeout; + protected long getReadDeadline(HttpServletRequest request) { + Long answer; - String name = request.getParameter(readTimeoutParameter); - if (name != null) { - answer = asLong(name); + answer = (Long) request.getAttribute(readTimeoutRequestAtt); + + if (answer == null) { + long timeout = defaultReadTimeout; + String name = request.getParameter(readTimeoutParameter); + if (name != null) { + timeout = asLong(name); + } + if (timeout < 0 || timeout > maximumReadTimeout) { + timeout = maximumReadTimeout; + } + + answer = Long.valueOf(System.currentTimeMillis() + timeout); } - if (answer < 0 || answer > maximumReadTimeout) { - answer = maximumReadTimeout; + return answer.longValue(); + } + + /** + * Close the consumer if one-shot mode is used on the given request. + */ + protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) { + if (asBoolean(request.getParameter(oneShotParameter), false)) { + try { + client.closeConsumer(dest); + } catch (JMSException jms_exc) { + LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc); + } } - return answer; } /* @@ -386,17 +463,9 @@ public class MessageServlet extends MessageServletSupport { synchronized (this.consumer) { if (continuation != null) { - try { - Message message = consumer.receiveNoWait(); - continuation.setAttribute("message", message); - } catch (Exception e) { - LOG.warn("Error receiving message due " + e.getMessage() + ". This exception is ignored.", e); - } finally { - continuation.resume(); - } + continuation.resume(); } } } } - }
