Author: chirino
Date: Tue Dec 13 17:07:08 2005
New Revision: 356666
URL: http://svn.apache.org/viewcvs?rev=356666&view=rev
Log:
Was having some wierd issues with the consumer.. disabled continuations for
now (but i don't think that was the issue).
added a semaphore to avoid multiple blocking threads (i've got a feeling this
might be what fixed it).
Modified:
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java
Modified:
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java?rev=356666&r1=356665&r2=356666&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
(original)
+++
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
Tue Dec 13 17:07:08 2005
@@ -18,12 +18,10 @@
package org.activemq.web;
-import org.activemq.MessageAvailableConsumer;
-import org.activemq.MessageAvailableListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.mortbay.util.ajax.Continuation;
-import org.mortbay.util.ajax.ContinuationSupport;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.LinkedList;
+import java.util.List;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -36,10 +34,12 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.LinkedList;
-import java.util.List;
+import org.activemq.MessageAvailableConsumer;
+import org.activemq.MessageAvailableListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
@@ -110,7 +110,7 @@
* from a queue
*/
protected void doDelete(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
- doMessages(request, response, 1);
+ doMessagesWithoutContinuation(request, response, 1);
}
/**
@@ -118,7 +118,7 @@
* from a queue
*/
protected void doGet(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
- doMessages(request, response, -1);
+ doMessagesWithoutContinuation(request, response, -1);
}
/**
@@ -229,6 +229,106 @@
throw new ServletException("Could not post JMS message: " + e, e);
}
finally {
+ if (log.isDebugEnabled()) {
+ log.debug("Received " + messages + " message(s)");
+ }
+ }
+ }
+
+ /**
+ * Reads a message from a destination up to some specific timeout period
+ *
+ * @param request
+ * @param response
+ * @throws ServletException
+ * @throws IOException
+ */
+ protected void doMessagesWithoutContinuation(HttpServletRequest request,
HttpServletResponse response,
+ int maxMessages) throws ServletException, IOException {
+
+ int messages = 0;
+ try {
+ WebClient client = getWebClient(request);
+ Destination destination = getDestination(client, request);
+ long timeout = getReadTimeout(request);
+ boolean ajax = isRicoAjax(request);
+ if (!ajax)
+ maxMessages = 1;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Receiving message(s) from: " + destination + " with
timeout: " + timeout);
+ }
+
+ MessageAvailableConsumer consumer = (MessageAvailableConsumer)
client.getConsumer(destination);
+ Continuation continuation = null;
+ Listener listener = null;
+ Message message = null;
+
+ // write a responds
+ response.setContentType("text/xml");
+ PrintWriter writer = response.getWriter();
+
+ if (ajax)
+ writer.println("<ajax-response>");
+
+ // Only one client thread at a time should poll for messages.
+ if (client.getSemaphore().tryAcquire()) {
+ try {
+ // Look for any available messages
+ message = consumer.receive(timeout);
+
+ // handle any message(s)
+ if (message == null) {
+ // No messages so OK response of for ajax else no
+ // content.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK :
HttpServletResponse.SC_NO_CONTENT);
+ } else {
+ // We have at least one message so set up the
+ // response
+ response.setStatus(HttpServletResponse.SC_OK);
+ String type = getContentType(request);
+ if (type != null)
+ response.setContentType(type);
+
+ // send a response for each available message (up to
+ // max
+ // messages)
+ while ((maxMessages < 0 || messages < maxMessages) &&
message != null) {
+ //
System.err.println("message["+messages+"]="+message);
+ if (ajax) {
+ writer.print("<response type='object' id='");
+ writer.print(request.getParameter("id"));
+ writer.println("'>");
+ } else
+ // only ever 1 message for non ajax!
+ setResponseHeaders(response, message);
+
+ writeMessageResponse(writer, message);
+
+ if (ajax)
+ writer.println("</response>");
+
+ // look for next message
+ message = consumer.receiveNoWait();
+ messages++;
+ }
+ }
+ } finally {
+ client.getSemaphore().release();
+ }
+ } else {
+ // Client is using us in another thread.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK :
HttpServletResponse.SC_NO_CONTENT);
+ }
+
+ if (ajax) {
+ writer.println("<response type='object'
id='poll'><ok/></response>");
+ writer.println("</ajax-response>");
+ }
+
+ } catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ } finally {
if (log.isDebugEnabled()) {
log.debug("Received " + messages + " message(s)");
}
Modified:
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java?rev=356666&r1=356665&r2=356666&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java
(original)
+++
incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java
Tue Dec 13 17:07:08 2005
@@ -18,13 +18,12 @@
package org.activemq.web;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
-import org.activemq.ActiveMQConnection;
-import org.activemq.ActiveMQConnectionFactory;
-import org.activemq.ActiveMQSession;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -40,12 +39,14 @@
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionEvent;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashMap;
-import java.util.Map;
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.ActiveMQSession;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
/**
* Represents a messaging client used from inside a web container
@@ -72,6 +73,8 @@
private transient Map topicConsumers = new ConcurrentHashMap();
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
+ private final Semaphore semaphore = new Semaphore(1);
+
/**
* @return the web client for the current HTTP session or null if there is
not a web client created yet
@@ -246,5 +249,9 @@
protected static class SessionConsumerPair {
public Session session;
public MessageConsumer consumer;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
}
}