Author: dejanb
Date: Thu May 6 11:21:13 2010
New Revision: 941664
URL: http://svn.apache.org/viewvc?rev=941664&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1547 - selector for rest consumer
Added:
activemq/trunk/activemq-web-demo/src/test/
activemq/trunk/activemq-web-demo/src/test/java/
activemq/trunk/activemq-web-demo/src/test/java/org/
activemq/trunk/activemq-web-demo/src/test/java/org/apache/
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyServer.java
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
activemq/trunk/activemq-web-demo/src/test/resources/
activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties
(with props)
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
Added:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyServer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyServer.java?rev=941664&view=auto
==============================================================================
---
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyServer.java
(added)
+++
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyServer.java
Thu May 6 11:21:13 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.web;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.demo.DefaultQueueSender;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.webapp.WebAppContext;
+
+/**
+ * A simple bootstrap class for starting Jetty in your IDE using the local web
+ * application.
+ *
+ * @version $Revision$
+ */
+public final class JettyServer {
+
+ public static final int PORT = 8080;
+
+ public static final String WEBAPP_DIR = "src/main/webapp";
+
+ public static final String WEBAPP_CTX = "/";
+
+ private JettyServer() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ // lets create a broker
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:61616");
+ broker.addConnector("stomp://localhost:61613");
+ broker.start();
+
+ // lets publish some messages so that there is some stuff to browse
+ DefaultQueueSender.main(new String[] {
+ "FOO.BAR"
+ });
+
+ // now lets start the web server
+ int port = PORT;
+ if (args.length > 0) {
+ String text = args[0];
+ port = Integer.parseInt(text);
+ }
+ System.out.println("Starting Web Server on port: " + port);
+ Server server = new Server();
+ SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setPort(port);
+ connector.setServer(server);
+ WebAppContext context = new WebAppContext();
+
+ context.setResourceBase(WEBAPP_DIR);
+ context.setContextPath(WEBAPP_CTX);
+ context.setServer(server);
+ server.setHandler(context);
+ server.setConnectors(new Connector[] {
+ connector
+ });
+ server.start();
+ }
+}
Added:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java?rev=941664&view=auto
==============================================================================
---
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
(added)
+++
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
Thu May 6 11:21:13 2010
@@ -0,0 +1,65 @@
+package org.apache.activemq.web;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.webapp.WebAppContext;
+
+import junit.framework.TestCase;
+
+public class JettyTestSupport extends TestCase {
+
+ BrokerService broker;
+ Server server;
+ ActiveMQConnectionFactory factory;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ broker.waitUntilStarted();
+
+ server = new Server();
+ SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setPort(8080);
+ connector.setServer(server);
+ WebAppContext context = new WebAppContext();
+
+ context.setResourceBase("src/main/webapp");
+ context.setContextPath("/");
+ context.setServer(server);
+ server.setHandler(context);
+ server.setConnectors(new Connector[] {
+ connector
+ });
+ server.start();
+
+ factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(session.createQueue("test"));
+ }
+
+ protected void tearDown() throws Exception {
+ server.stop();
+ broker.stop();
+ broker.waitUntilStopped();
+ session.close();
+ connection.close();
+ }
+
+
+
+}
Added:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java?rev=941664&view=auto
==============================================================================
---
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
(added)
+++
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
Thu May 6 11:21:13 2010
@@ -0,0 +1,44 @@
+package org.apache.activemq.web;
+
+import javax.jms.TextMessage;
+
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+
+public class RestTest extends JettyTestSupport {
+
+ public void testConsume() throws Exception {
+ producer.send(session.createTextMessage("test"));
+
+ HttpClient httpClient = new HttpClient();
+ httpClient.start();
+ ContentExchange contentExchange = new ContentExchange();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+
contentExchange.setURL("http://localhost:8080/message/test?timeout=1000&type=queue");
+ httpClient.send(contentExchange);
+ contentExchange.waitForDone();
+ assertEquals("test", contentExchange.getResponseContent());
+
+ }
+
+ public void testSelector() throws Exception {
+ TextMessage msg1 = session.createTextMessage("test1");
+ msg1.setIntProperty("test", 1);
+ producer.send(msg1);
+
+ TextMessage msg2 = session.createTextMessage("test2");
+ msg2.setIntProperty("test", 2);
+ producer.send(msg2);
+
+ HttpClient httpClient = new HttpClient();
+ httpClient.start();
+ ContentExchange contentExchange = new ContentExchange();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+
contentExchange.setURL("http://localhost:8080/message/test?timeout=1000&type=queue");
+ contentExchange.setRequestHeader(WebClient.SELECTOR_NAME, "test=2");
+ httpClient.send(contentExchange);
+ contentExchange.waitForDone();
+ assertEquals("test2", contentExchange.getResponseContent());
+ }
+
+}
Added: activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties?rev=941664&view=auto
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties (added)
+++ activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties Thu
May 6 11:21:13 2010
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out, stdout
+
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.web=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1}
- %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} -
%m%n
+log4j.appender.out.file=target/test-reports/activemq-test.log
+log4j.appender.out.append=true
Propchange: activemq/trunk/activemq-web-demo/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:executable = *
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=941664&r1=941663&r2=941664&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Thu May 6 11:21:13 2010
@@ -156,7 +156,7 @@ public class MessageListenerServlet exte
Map<MessageAvailableConsumer, String>
consumerDestinationMap = getConsumerDestinationNameMap(request);
client.closeConsumer(destination); // drop any existing
// consumer.
- MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination);
+ MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.SELECTOR_NAME));
consumer.setAvailableListener(listener);
consumerIdMap.put(consumer, message);
@@ -167,7 +167,7 @@ public class MessageListenerServlet exte
} else if ("unlisten".equals(type)) {
Map<MessageAvailableConsumer, String> consumerIdMap =
getConsumerIdMap(request);
Map consumerDestinationMap =
getConsumerDestinationNameMap(request);
- MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination);
+ MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.SELECTOR_NAME));
consumer.setAvailableListener(null);
consumerIdMap.remove(consumer);
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=941664&r1=941663&r2=941664&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Thu May 6 11:21:13 2010
@@ -180,7 +180,7 @@ public class MessageServlet extends Mess
LOG.debug("Receiving message(s) from: " + destination + " with
timeout: " + timeout);
}
- MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination);
+ MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.SELECTOR_NAME));
Continuation continuation = null;
Listener listener = null;
Message message = null;
@@ -297,7 +297,7 @@ public class MessageServlet extends Mess
LOG.debug("Receiving message(s) from: " + destination + " with
timeout: " + timeout);
}
- MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination);
+ MessageAvailableConsumer consumer =
(MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.SELECTOR_NAME));
Message message = null;
// write a responds
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java?rev=941664&r1=941663&r2=941664&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
Thu May 6 11:21:13 2010
@@ -353,4 +353,8 @@ public abstract class MessageServletSupp
}
return answer;
}
+
+ protected String getSelector(HttpServletRequest request) throws
IOException {
+ return request.getHeader(WebClient.SELECTOR_NAME);
+ }
}
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=941664&r1=941663&r2=941664&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
Thu May 6 11:21:13 2010
@@ -70,6 +70,7 @@ public class WebClient implements HttpSe
public static final String CONNECTION_FACTORY_PREFETCH_PARAM =
"org.apache.activemq.connectionFactory.prefetch";
public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM =
"org.apache.activemq.connectionFactory.optimizeAck";
public static final String BROKER_URL_INIT_PARAM =
"org.apache.activemq.brokerURL";
+ public static final String SELECTOR_NAME = "org.apache.activemq.selector";
private static final Log LOG = LogFactory.getLog(WebClient.class);
@@ -196,7 +197,7 @@ public class WebClient implements HttpSe
try {
Destination destination =
destinationName.startsWith("topic://") ?
(Destination)getSession().createTopic(destinationName) :
(Destination)getSession().createQueue(destinationName);
- consumers.put(destination, getConsumer(destination, true));
+ consumers.put(destination, getConsumer(destination, null,
true));
} catch (JMSException e) {
LOG.debug("Caought Exception ", e);
IOException ex = new IOException(e.getMessage());
@@ -304,14 +305,14 @@ public class WebClient implements HttpSe
this.producer = producer;
}
- public synchronized MessageConsumer getConsumer(Destination destination)
throws JMSException {
- return getConsumer(destination, true);
+ public synchronized MessageConsumer getConsumer(Destination destination,
String selector) throws JMSException {
+ return getConsumer(destination, selector, true);
}
- public synchronized MessageConsumer getConsumer(Destination destination,
boolean create) throws JMSException {
+ public synchronized MessageConsumer getConsumer(Destination destination,
String selector, boolean create) throws JMSException {
MessageConsumer consumer = consumers.get(destination);
if (create && consumer == null) {
- consumer = getSession().createConsumer(destination);
+ consumer = getSession().createConsumer(destination, selector);
consumers.put(destination, consumer);
}
return consumer;