Author: dejanb
Date: Thu Oct 9 06:17:17 2008
New Revision: 703161
URL: http://svn.apache.org/viewvc?rev=703161&view=rev
Log:
cumulative commit for issues AMQ-1955, AMQ-1453 and AMQ-1960
Modified:
activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
activemq/trunk/activemq-web/pom.xml
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
Modified: activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml Thu Oct 9
06:17:17 2008
@@ -52,6 +52,14 @@
<servlet-name>MessageServlet</servlet-name>
<servlet-class>org.apache.activemq.web.MessageServlet</servlet-class>
<load-on-startup>1</load-on-startup>
+ <!--
+ Uncomment this parameter if you plan to use multiple consumers over
REST
+ <init-param>
+ <param-name>destinationOptions</param-name>
+ <param-value>consumer.prefetchSize=1</param-value>
+ </init-param>
+ -->
+
</servlet>
<!-- the queue browse servlet -->
Modified: activemq/trunk/activemq-web/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/pom.xml?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web/pom.xml (original)
+++ activemq/trunk/activemq-web/pom.xml Thu Oct 9 06:17:17 2008
@@ -37,6 +37,10 @@
<artifactId>activemq-core</artifactId>
</dependency>
<dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activemq-camel</artifactId>
+ </dependency>
+ <dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
<scope>test</scope>
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Thu Oct 9 06:17:17 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -29,12 +30,20 @@
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
+import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mortbay.util.ajax.Continuation;
@@ -56,6 +65,9 @@
private String readTimeoutParameter = "readTimeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
+ private long requestTimeout = 1000;
+
+ private HashMap<String, WebClient> clients = new HashMap<String,
WebClient>();
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
@@ -67,6 +79,10 @@
if (name != null) {
maximumReadTimeout = asLong(name);
}
+ name = servletConfig.getInitParameter("replyTimeout");
+ if (name != null) {
+ requestTimeout = asLong(name);
+ }
}
/**
@@ -80,7 +96,7 @@
protected void doPost(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
// lets turn the HTTP post into a JMS Message
try {
- WebClient client = WebClient.getWebClient(request);
+ WebClient client = getWebClient(request);
String text = getPostedMessageBody(request);
@@ -94,12 +110,28 @@
LOG.debug("Sending message to: " + destination + " with text:
" + text);
}
+ boolean sync = isSync(request);
TextMessage message = client.getSession().createTextMessage(text);
- appendParametersToMessage(request, message);
- boolean persistent = isSendPersistent(request);
- int priority = getSendPriority(request);
- long timeToLive = getSendTimeToLive(request);
- client.send(destination, message, persistent, priority,
timeToLive);
+
+ if (sync) {
+ String point = "activemq:"
+ +
((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
+ + "?requestTimeout=" + requestTimeout;
+ try {
+ String body =
(String)client.getProducerTemplate().requestBody(point, text);
+ ActiveMQTextMessage answer = new ActiveMQTextMessage();
+ answer.setText(body);
+ writeMessageResponse(response.getWriter(), answer);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ appendParametersToMessage(request, message);
+ boolean persistent = isSendPersistent(request);
+ int priority = getSendPriority(request);
+ long timeToLive = getSendTimeToLive(request);
+ client.send(destination, message, persistent, priority,
timeToLive);
+ }
// lets return a unique URI for reliable messaging
response.setHeader("messageID", message.getJMSMessageID());
@@ -137,7 +169,7 @@
int messages = 0;
try {
- WebClient client = WebClient.getWebClient(request);
+ WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
if (destination == null) {
throw new NoDestinationSuppliedException();
@@ -224,8 +256,10 @@
}
// look for next message
- message = consumer.receiveNoWait();
messages++;
+ if(maxMessages < 0 || messages < maxMessages) {
+ message = consumer.receiveNoWait();
+ }
}
}
@@ -255,7 +289,7 @@
int messages = 0;
try {
- WebClient client = WebClient.getWebClient(request);
+ WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
long timeout = getReadTimeout(request);
boolean ajax = isRicoAjax(request);
@@ -317,8 +351,11 @@
}
// look for next message
- message = consumer.receiveNoWait();
messages++;
+ if(maxMessages < 0 || messages < maxMessages) {
+ message = consumer.receiveNoWait();
+ }
+
}
}
} finally {
@@ -362,6 +399,25 @@
String rico = request.getParameter("rico");
return rico != null && rico.equals("true");
}
+
+ public WebClient getWebClient(HttpServletRequest request) {
+ String clientId = request.getParameter("clientId");
+ if (clientId != null) {
+ synchronized(this) {
+ LOG.debug("Getting local client [" + clientId + "]");
+ WebClient client = clients.get(clientId);
+ if (client == null) {
+ LOG.debug("Creating new client [" + clientId +
"]");
+ client = new WebClient();
+ clients.put(clientId, client);
+ }
+ return client;
+ }
+
+ } else {
+ return WebClient.getWebClient(request);
+ }
+ }
protected String getContentType(HttpServletRequest request) {
/*
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
Thu Oct 9 06:17:17 2008
@@ -179,6 +179,14 @@
return defaultMessagePersistent;
}
+ protected boolean isSync(HttpServletRequest request) {
+ String text = request.getParameter("sync");
+ if (text != null) {
+ return true;
+ }
+ return false;
+ }
+
protected Destination asDestination(Object value) {
if (value instanceof Destination) {
return (Destination)value;
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
Thu Oct 9 06:17:17 2008
@@ -47,9 +47,17 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.MessageAvailableConsumer;
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.activemq.camel.component.ActiveMQConfiguration;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.util.logging.resources.logging;
+
/**
* Represents a messaging client used from inside a web container typically
* stored inside a HttpSession TODO controls to prevent DOS attacks with users
@@ -77,6 +85,9 @@
private final Semaphore semaphore = new Semaphore(1);
+ private CamelContext camelContext;
+ private ProducerTemplate producerTemplate;
+
public WebClient() {
if (factory == null) {
throw new IllegalStateException("initContext(ServletContext) not
called");
@@ -111,6 +122,7 @@
public static void initContext(ServletContext context) {
initConnectionFactory(context);
+ context.setAttribute("webClients", new HashMap<String, WebClient>());
}
public int getDeliveryMode() {
@@ -143,12 +155,16 @@
if (connection != null) {
connection.close();
}
- } catch (JMSException e) {
+ if (producerTemplate != null) {
+ producerTemplate.stop();
+ }
+ } catch (Exception e) {
LOG.debug("caught exception closing consumer", e);
} finally {
producer = null;
session = null;
connection = null;
+ producerTemplate = null;
if (consumers != null) {
consumers.clear();
}
@@ -256,6 +272,27 @@
servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
}
}
+
+ public synchronized CamelContext getCamelContext() {
+ if (camelContext == null) {
+ LOG.debug("Creating camel context");
+ camelContext = new DefaultCamelContext();
+ ActiveMQConfiguration conf = new ActiveMQConfiguration();
+ conf.setConnectionFactory(new
PooledConnectionFactory((ActiveMQConnectionFactory)factory));
+ ActiveMQComponent component = new ActiveMQComponent(conf);
+ camelContext.addComponent("activemq", component);
+ }
+ return camelContext;
+ }
+
+ public synchronized ProducerTemplate getProducerTemplate() throws
Exception {
+ if (producerTemplate == null) {
+ LOG.debug("Creating producer template");
+ producerTemplate = getCamelContext().createProducerTemplate();
+ producerTemplate.start();
+ }
+ return producerTemplate;
+ }
public synchronized MessageProducer getProducer() throws JMSException {
if (producer == null) {