Author: tabish
Date: Tue May 29 14:20:44 2012
New Revision: 1343742
URL: http://svn.apache.org/viewvc?rev=1343742&view=rev
Log:
apply additional fix and unit test for:
https://issues.apache.org/jira/browse/AMQ-3856
Modified:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Modified:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java?rev=1343742&r1=1343741&r2=1343742&view=diff
==============================================================================
---
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
(original)
+++
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
Tue May 29 14:20:44 2012
@@ -16,25 +16,26 @@
*/
package org.apache.activemq.web;
+import java.util.Set;
+
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.commons.lang.RandomStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
-
-import java.util.Set;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RestTest extends JettyTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(RestTest.class);
- public void testConsume() throws Exception {
- producer.send(session.createTextMessage("test"));
- LOG.info("message sent");
+ public void testConsume() throws Exception {
+ producer.send(session.createTextMessage("test"));
+ LOG.info("message sent");
- HttpClient httpClient = new HttpClient();
+ HttpClient httpClient = new HttpClient();
httpClient.start();
ContentExchange contentExchange = new ContentExchange();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
@@ -42,9 +43,9 @@ public class RestTest extends JettyTestS
httpClient.send(contentExchange);
contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent());
- }
+ }
- public void testSubscribeFirst() throws Exception {
+ public void testSubscribeFirst() throws Exception {
HttpClient httpClient = new HttpClient();
httpClient.start();
ContentExchange contentExchange = new ContentExchange();
@@ -59,18 +60,18 @@ public class RestTest extends JettyTestS
contentExchange.waitForDone();
assertEquals("test", contentExchange.getResponseContent());
- }
+ }
- public void testSelector() throws Exception {
- TextMessage msg1 = session.createTextMessage("test1");
- msg1.setIntProperty("test", 1);
- producer.send(msg1);
- LOG.info("message 1 sent");
-
- TextMessage msg2 = session.createTextMessage("test2");
- msg2.setIntProperty("test", 2);
- producer.send(msg2);
- LOG.info("message 2 sent");
+ public void testSelector() throws Exception {
+ TextMessage msg1 = session.createTextMessage("test1");
+ msg1.setIntProperty("test", 1);
+ producer.send(msg1);
+ LOG.info("message 1 sent");
+
+ TextMessage msg2 = session.createTextMessage("test2");
+ msg2.setIntProperty("test", 2);
+ producer.send(msg2);
+ LOG.info("message 2 sent");
HttpClient httpClient = new HttpClient();
httpClient.start();
@@ -81,11 +82,11 @@ public class RestTest extends JettyTestS
httpClient.send(contentExchange);
contentExchange.waitForDone();
assertEquals("test2", contentExchange.getResponseContent());
- }
+ }
- // test for https://issues.apache.org/activemq/browse/AMQ-2827
- public void testCorrelation() throws Exception {
- for (int i = 0; i < 200; i++) {
+ // test for https://issues.apache.org/activemq/browse/AMQ-2827
+ public void testCorrelation() throws Exception {
+ for (int i = 0; i < 200; i++) {
String correlId = "RESTY" + RandomStringUtils.randomNumeric(10);
TextMessage message = session.createTextMessage(correlId);
@@ -106,8 +107,8 @@ public class RestTest extends JettyTestS
LOG.info("Received: [" + contentExchange.getResponseStatus() + "]
" + contentExchange.getResponseContent());
assertEquals(200, contentExchange.getResponseStatus());
assertEquals(correlId, contentExchange.getResponseContent());
- }
- }
+ }
+ }
public void testDisconnect() throws Exception {
@@ -133,4 +134,23 @@ public class RestTest extends JettyTestS
Set<ObjectName> subs = broker.getManagementContext().queryNames(query,
null);
assertEquals("Consumers not closed", 0 , subs.size());
}
+
+ public void testPost() throws Exception {
+ HttpClient httpClient = new HttpClient();
+ httpClient.start();
+ ContentExchange contentExchange = new ContentExchange();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+ contentExchange.setMethod("POST");
+
contentExchange.setURL("http://localhost:8080/message/testPost?type=queue");
+ httpClient.send(contentExchange);
+
+ contentExchange.waitForDone();
+ assertTrue("success status",
HttpStatus.isSuccess(contentExchange.getResponseStatus()));
+
+ ContentExchange contentExchange2 = new ContentExchange();
+
contentExchange2.setURL("http://localhost:8080/message/testPost?readTimeout=1000&type=Queue");
+ httpClient.send(contentExchange2);
+ contentExchange2.waitForDone();
+ assertTrue("success status",
HttpStatus.isSuccess(contentExchange2.getResponseStatus()));
+ }
}
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=1343742&r1=1343741&r2=1343742&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Tue May 29 14:20:44 2012
@@ -36,10 +36,10 @@ import org.apache.activemq.MessageAvaila
import org.apache.activemq.MessageAvailableListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
@@ -48,8 +48,8 @@ import org.eclipse.jetty.continuation.Co
* the servlet or as request parameters. <p/> For reading messages you can
* specify a readTimeout parameter to determine how long the servlet should
* block for.
- *
- *
+ *
+ *
*/
public class MessageServlet extends MessageServletSupport {
@@ -61,7 +61,7 @@ public class MessageServlet extends Mess
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
private long requestTimeout = 1000;
-
+
private HashMap<String, WebClient> clients = new HashMap<String,
WebClient>();
public void init() throws ServletException {
@@ -76,13 +76,13 @@ public class MessageServlet extends Mess
}
name = servletConfig.getInitParameter("replyTimeout");
if (name != null) {
- requestTimeout = asLong(name);
- }
+ requestTimeout = asLong(name);
+ }
}
/**
* Sends a message to a destination
- *
+ *
* @param request
* @param response
* @throws ServletException
@@ -120,24 +120,24 @@ public class MessageServlet extends Mess
TextMessage message = client.getSession().createTextMessage(text);
if (sync) {
- String point = "activemq:"
- +
((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
- + "?requestTimeout=" + requestTimeout;
+ String point = "activemq:"
+ +
((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
+ + "?requestTimeout=" + requestTimeout;
try {
- String body =
(String)client.getProducerTemplate().requestBody(point, text);
+ String body =
(String)client.getProducerTemplate().requestBody(point, text);
ActiveMQTextMessage answer = new ActiveMQTextMessage();
answer.setText(body);
- writeMessageResponse(response.getWriter(), answer);
+ writeMessageResponse(response.getWriter(), answer);
} catch (Exception e) {
- IOException ex = new IOException();
- ex.initCause(e);
- throw ex;
+ IOException ex = new IOException();
+ ex.initCause(e);
+ throw ex;
}
} else {
appendParametersToMessage(request, message);
boolean persistent = isSendPersistent(request);
int priority = getSendPriority(request);
- long timeToLive = getSendTimeToLive(request);
+ long timeToLive = getSendTimeToLive(request);
client.send(destination, message, persistent, priority,
timeToLive);
}
@@ -167,7 +167,7 @@ public class MessageServlet extends Mess
/**
* Reads a message from a destination up to some specific timeout period
- *
+ *
* @param request
* @param response
* @throws ServletException
@@ -182,7 +182,7 @@ public class MessageServlet extends Mess
}
MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.selectorName));
Message message = null;
- message = (Message)request.getAttribute("message");
+ message = (Message)request.getAttribute("message");
if (message != null) {
// we're resuming continuation,
// so just write the message and return
@@ -197,7 +197,7 @@ public class MessageServlet extends Mess
Continuation continuation = null;
Listener listener = null;
-
+
// Look for any available messages
message = consumer.receive(10);
@@ -206,7 +206,7 @@ public class MessageServlet extends Mess
// no events.
if (message == null) {
continuation = ContinuationSupport.getContinuation(request);
-
+
if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
return;
@@ -214,7 +214,7 @@ public class MessageServlet extends Mess
continuation.setTimeout(timeout);
continuation.suspend();
-
+
// Fetch the listeners
listener = (Listener)consumer.getAvailableListener();
if (listener == null) {
@@ -231,7 +231,7 @@ public class MessageServlet extends Mess
throw new ServletException("Could not post JMS message: " + e, e);
}
}
-
+
protected void writeResponse(HttpServletRequest request,
HttpServletResponse response, Message message) throws IOException, JMSException
{
int messages = 0;
try {
@@ -251,7 +251,7 @@ public class MessageServlet extends Mess
if (type != null) {
response.setContentType(type);
}
-
+
setResponseHeaders(response, message);
writeMessageResponse(writer, message);
}
@@ -266,14 +266,18 @@ public class MessageServlet extends Mess
if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage)message;
String txt = textMsg.getText();
- if (txt.startsWith("<?")) {
- txt = txt.substring(txt.indexOf("?>") + 2);
+ if (txt != null) {
+ if (txt.startsWith("<?")) {
+ txt = txt.substring(txt.indexOf("?>") + 2);
+ }
+ writer.print(txt);
}
- writer.print(txt);
} else if (message instanceof ObjectMessage) {
ObjectMessage objectMsg = (ObjectMessage)message;
Object object = objectMsg.getObject();
- writer.print(object.toString());
+ if (object != null) {
+ writer.print(object.toString());
+ }
}
}
@@ -281,25 +285,25 @@ public class MessageServlet extends Mess
String rico = request.getParameter("rico");
return rico != null && rico.equals("true");
}
-
+
public WebClient getWebClient(HttpServletRequest request) {
- String clientId = request.getParameter("clientId");
- if (clientId != null) {
- synchronized(this) {
- LOG.debug("Getting local client [" + clientId + "]");
- WebClient client = clients.get(clientId);
- if (client == null) {
- LOG.debug("Creating new client [" + clientId +
"]");
- client = new WebClient();
- clients.put(clientId, client);
- }
- return client;
- }
-
- } else {
- return WebClient.getWebClient(request);
- }
- }
+ String clientId = request.getParameter("clientId");
+ if (clientId != null) {
+ synchronized(this) {
+ LOG.debug("Getting local client [" + clientId + "]");
+ WebClient client = clients.get(clientId);
+ if (client == null) {
+ LOG.debug("Creating new client [" + clientId + "]");
+ client = new WebClient();
+ clients.put(clientId, client);
+ }
+ return client;
+ }
+
+ } else {
+ return WebClient.getWebClient(request);
+ }
+ }
protected String getContentType(HttpServletRequest request) {
/*
@@ -365,7 +369,7 @@ public class MessageServlet extends Mess
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
- continuation.resume();
+ continuation.resume();
}
}
}