Author: foconer
Date: Tue Dec 20 01:54:54 2005
New Revision: 357943
URL: http://svn.apache.org/viewcvs?rev=357943&view=rev
Log:
Issue: source is not compiled. The generated jar file is empty.
Solution: Restructured the source to src/main/java.
Added:
incubator/activemq/trunk/activemq-web/src/main/
incubator/activemq/trunk/activemq-web/src/main/java/
incubator/activemq/trunk/activemq-web/src/main/java/org/
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
Removed:
incubator/activemq/trunk/activemq-web/src/java/
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.activemq.web;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.servlet.http.HttpSessionEvent;
+import javax.servlet.http.HttpSessionListener;
+
+/**
+ * Listens to sessions closing to ensure that JMS connections are
+ * cleaned up nicely
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ConnectionManager implements HttpSessionListener {
+ private static final Log log = LogFactory.getLog(ConnectionManager.class);
+
+ public void sessionCreated(HttpSessionEvent event) {
+ }
+
+ public void sessionDestroyed(HttpSessionEvent event) {
+ /** TODO we can't use the session any more now!
+ WebClient client = WebClient.getWebClient(event.getSession());
+ try {
+ client.stop();
+ }
+ catch (JMSException e) {
+ log.warn("Error closing connection: " + e, e);
+ }
+ */
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,420 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.activemq.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.activemq.MessageAvailableConsumer;
+import org.activemq.MessageAvailableListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
+
+/**
+ * A servlet for sending and receiving messages to/from JMS destinations using
+ * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
+ * destination and whether it is a topic or queue via configuration details on
+ * the servlet or as request parameters. <p/> For reading messages you can
+ * specify a readTimeout parameter to determine how long the servlet should
+ * block for.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MessageServlet extends MessageServletSupport {
+ private static final Log log = LogFactory.getLog(MessageServlet.class);
+
+ private String readTimeoutParameter = "readTimeout";
+ private long defaultReadTimeout = -1;
+ private long maximumReadTimeout = 20000;
+
+ public void init() throws ServletException {
+ ServletConfig servletConfig = getServletConfig();
+ String name = servletConfig.getInitParameter("defaultReadTimeout");
+ if (name != null) {
+ defaultReadTimeout = asLong(name);
+ }
+ name = servletConfig.getInitParameter("maximumReadTimeout");
+ if (name != null) {
+ maximumReadTimeout = asLong(name);
+ }
+ }
+
+ /**
+ * Sends a message to a destination
+ *
+ * @param request
+ * @param response
+ * @throws ServletException
+ * @throws IOException
+ */
+ protected void doPost(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
+ // lets turn the HTTP post into a JMS Message
+ try {
+ WebClient client = getWebClient(request);
+
+ String text = getPostedMessageBody(request);
+
+ // lets create the destination from the URI?
+ Destination destination = getDestination(client, request);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending message to: " + destination + " with text:
" + text);
+ }
+
+ TextMessage message = client.getSession().createTextMessage(text);
+ appendParametersToMessage(request, message);
+ client.send(destination, message);
+
+ // lets return a unique URI for reliable messaging
+ response.setHeader("messageID", message.getJMSMessageID());
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ }
+ }
+
+ /**
+ * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+ * from a queue
+ */
+ protected void doDelete(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
+ doMessages(request, response, 1);
+ }
+
+ /**
+ * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+ * from a queue
+ */
+ protected void doGet(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
+ doMessages(request, response, -1);
+ }
+
+ /**
+ * Reads a message from a destination up to some specific timeout period
+ *
+ * @param request
+ * @param response
+ * @throws ServletException
+ * @throws IOException
+ */
+ protected void doMessages(HttpServletRequest request, HttpServletResponse
response, int maxMessages) throws ServletException, IOException {
+
+ int messages = 0;
+ try {
+ WebClient client = getWebClient(request);
+ Destination destination = getDestination(client, request);
+ long timeout = getReadTimeout(request);
+ boolean ajax = isRicoAjax(request);
+ if (!ajax)
+ maxMessages = 1;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Receiving message(s) from: " + destination + " with
timeout: " + timeout);
+ }
+
+ MessageAvailableConsumer consumer = (MessageAvailableConsumer)
client.getConsumer(destination);
+ Continuation continuation = null;
+ Listener listener = null;
+ Message message = null;
+
+ synchronized (consumer) {
+ // Fetch the listeners
+ listener = (Listener) consumer.getAvailableListener();
+ if (listener == null) {
+ listener = new Listener(consumer);
+ consumer.setAvailableListener(listener);
+ }
+ // Look for any available messages
+ message = consumer.receiveNoWait();
+
+ // Get an existing Continuation or create a new one if there
are
+ // no events.
+ if (message == null) {
+ continuation =
ContinuationSupport.getContinuation(request, consumer);
+
+ // register this continuation with our listener.
+ listener.setContinuation(continuation);
+
+ // Get the continuation object (may wait and/or retry
+ // request here).
+ continuation.suspend(timeout);
+ }
+
+ // Try again now
+ if (message == null)
+ message = consumer.receiveNoWait();
+
+ // write a responds
+ response.setContentType("text/xml");
+ PrintWriter writer = response.getWriter();
+
+ if (ajax)
+ writer.println("<ajax-response>");
+
+ // handle any message(s)
+ if (message == null) {
+ // No messages so OK response of for ajax else no content.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK :
HttpServletResponse.SC_NO_CONTENT);
+ }
+ else {
+ // We have at least one message so set up the response
+ response.setStatus(HttpServletResponse.SC_OK);
+ String type = getContentType(request);
+ if (type != null)
+ response.setContentType(type);
+
+ // send a response for each available message (up to max
+ // messages)
+ while ((maxMessages < 0 || messages < maxMessages) &&
message != null) {
+ //
System.err.println("message["+messages+"]="+message);
+ if (ajax) {
+ writer.print("<response type='object' id='");
+ writer.print(request.getParameter("id"));
+ writer.println("'>");
+ }
+ else
+ // only ever 1 message for non ajax!
+ setResponseHeaders(response, message);
+
+ writeMessageResponse(writer, message);
+
+ if (ajax)
+ writer.println("</response>");
+
+ // look for next message
+ message = consumer.receiveNoWait();
+ messages++;
+ }
+ }
+
+ if (ajax) {
+ writer.println("<response type='object'
id='poll'><ok/></response>");
+ writer.println("</ajax-response>");
+ }
+ }
+ }
+ catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ }
+ finally {
+ if (log.isDebugEnabled()) {
+ log.debug("Received " + messages + " message(s)");
+ }
+ }
+ }
+
+ /**
+ * Reads a message from a destination up to some specific timeout period
+ *
+ * @param request
+ * @param response
+ * @throws ServletException
+ * @throws IOException
+ */
+ protected void doMessagesWithoutContinuation(HttpServletRequest request,
HttpServletResponse response,
+ int maxMessages) throws ServletException, IOException {
+
+ int messages = 0;
+ try {
+ WebClient client = getWebClient(request);
+ Destination destination = getDestination(client, request);
+ long timeout = getReadTimeout(request);
+ boolean ajax = isRicoAjax(request);
+ if (!ajax)
+ maxMessages = 1;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Receiving message(s) from: " + destination + " with
timeout: " + timeout);
+ }
+
+ MessageAvailableConsumer consumer = (MessageAvailableConsumer)
client.getConsumer(destination);
+ Continuation continuation = null;
+ Listener listener = null;
+ Message message = null;
+
+ // write a responds
+ response.setContentType("text/xml");
+ PrintWriter writer = response.getWriter();
+
+ if (ajax)
+ writer.println("<ajax-response>");
+
+ // Only one client thread at a time should poll for messages.
+ if (client.getSemaphore().tryAcquire()) {
+ try {
+ // Look for any available messages
+ message = consumer.receive(timeout);
+
+ // handle any message(s)
+ if (message == null) {
+ // No messages so OK response of for ajax else no
+ // content.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK :
HttpServletResponse.SC_NO_CONTENT);
+ } else {
+ // We have at least one message so set up the
+ // response
+ response.setStatus(HttpServletResponse.SC_OK);
+ String type = getContentType(request);
+ if (type != null)
+ response.setContentType(type);
+
+ // send a response for each available message (up to
+ // max
+ // messages)
+ while ((maxMessages < 0 || messages < maxMessages) &&
message != null) {
+ //
System.err.println("message["+messages+"]="+message);
+ if (ajax) {
+ writer.print("<response type='object' id='");
+ writer.print(request.getParameter("id"));
+ writer.println("'>");
+ } else
+ // only ever 1 message for non ajax!
+ setResponseHeaders(response, message);
+
+ writeMessageResponse(writer, message);
+
+ if (ajax)
+ writer.println("</response>");
+
+ // look for next message
+ message = consumer.receiveNoWait();
+ messages++;
+ }
+ }
+ } finally {
+ client.getSemaphore().release();
+ }
+ } else {
+ // Client is using us in another thread.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK :
HttpServletResponse.SC_NO_CONTENT);
+ }
+
+ if (ajax) {
+ writer.println("<response type='object'
id='poll'><ok/></response>");
+ writer.println("</ajax-response>");
+ }
+
+ } catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ } finally {
+ if (log.isDebugEnabled()) {
+ log.debug("Received " + messages + " message(s)");
+ }
+ }
+ }
+
+ protected void writeMessageResponse(PrintWriter writer, Message message)
throws JMSException, IOException {
+ if (message instanceof TextMessage) {
+ TextMessage textMsg = (TextMessage) message;
+ writer.print(textMsg.getText());
+ }
+ else if (message instanceof ObjectMessage) {
+ ObjectMessage objectMsg = (ObjectMessage) message;
+ Object object = objectMsg.getObject();
+ writer.print(object.toString());
+ }
+ }
+
+ protected boolean isRicoAjax(HttpServletRequest request) {
+ String rico = request.getParameter("rico");
+ return rico != null && rico.equals("true");
+ }
+
+ protected String getContentType(HttpServletRequest request) {
+ /*
+ * log("Params: " + request.getParameterMap()); Enumeration iter =
+ * request.getHeaderNames(); while (iter.hasMoreElements()) { String
+ * name = (String) iter.nextElement(); log("Header: " + name + " = " +
+ * request.getHeader(name)); }
+ */
+ String value = request.getParameter("xml");
+ if (value != null && "true".equalsIgnoreCase(value)) {
+ return "text/xml";
+ }
+ return null;
+ }
+
+ protected void setResponseHeaders(HttpServletResponse response, Message
message) throws JMSException {
+ response.setHeader("destination",
message.getJMSDestination().toString());
+ response.setHeader("id", message.getJMSMessageID());
+ }
+
+ /**
+ * @return the timeout value for read requests which is always >= 0 and <=
+ * maximumReadTimeout to avoid DoS attacks
+ */
+ protected long getReadTimeout(HttpServletRequest request) {
+ long answer = defaultReadTimeout;
+
+ String name = request.getParameter(readTimeoutParameter);
+ if (name != null) {
+ answer = asLong(name);
+ }
+ if (answer < 0 || answer > maximumReadTimeout) {
+ answer = maximumReadTimeout;
+ }
+ return answer;
+ }
+
+ /*
+ * Listen for available messages and wakeup any continuations.
+ */
+ private class Listener implements MessageAvailableListener {
+ MessageConsumer consumer;
+ Continuation continuation;
+ List queue = new LinkedList();
+
+ Listener(MessageConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public void setContinuation(Continuation continuation) {
+ synchronized (consumer) {
+ this.continuation = continuation;
+ }
+ }
+
+ public void onMessageAvailable(MessageConsumer consumer) {
+ assert this.consumer == consumer;
+
+ synchronized (this.consumer) {
+ if (continuation != null)
+ continuation.resume();
+ continuation = null;
+ }
+ }
+ }
+
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,223 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.activemq.web;
+
+import org.activemq.command.ActiveMQQueue;
+import org.activemq.command.ActiveMQTopic;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A useful base class for any JMS related servlet;
+ * there are various ways to map JMS operations to web requests
+ * so we put most of the common behaviour in a reusable base class.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public abstract class MessageServletSupport extends HttpServlet {
+
+ private boolean defaultTopicFlag = true;
+ private Destination defaultDestination;
+ private String destinationParameter = "destination";
+ private String topicParameter = "topic";
+ private String bodyParameter = "body";
+
+
+ public void init(ServletConfig servletConfig) throws ServletException {
+ super.init(servletConfig);
+
+ String name = servletConfig.getInitParameter("topic");
+ if (name != null) {
+ defaultTopicFlag = asBoolean(name);
+ }
+
+ log("Defaulting to use topics: " + defaultTopicFlag);
+
+ name = servletConfig.getInitParameter("destination");
+ if (name != null) {
+ if (defaultTopicFlag) {
+ defaultDestination = new ActiveMQTopic(name);
+ }
+ else {
+ defaultDestination = new ActiveMQQueue(name);
+ }
+ }
+
+ // lets check to see if there's a connection factory set
+ WebClient.initContext(getServletContext());
+ }
+
+ protected WebClient createWebClient(HttpServletRequest request) {
+ return new WebClient(getServletContext());
+ }
+
+ public static boolean asBoolean(String param) {
+ return asBoolean(param, false);
+ }
+
+ public static boolean asBoolean(String param, boolean defaultValue) {
+ if (param == null) {
+ return defaultValue;
+ }
+ else {
+ return param.equalsIgnoreCase("true");
+ }
+ }
+
+ /**
+ * Helper method to get the client for the current session
+ *
+ * @param request is the current HTTP request
+ * @return the current client or a newly creates
+ */
+ protected WebClient getWebClient(HttpServletRequest request) {
+ HttpSession session = request.getSession(true);
+ WebClient client = WebClient.getWebClient(session);
+ if (client == null) {
+ client = createWebClient(request);
+ session.setAttribute(WebClient.webClientAttribute, client);
+ }
+ return client;
+ }
+
+
+ protected void appendParametersToMessage(HttpServletRequest request,
TextMessage message) throws JMSException {
+ for (Iterator iter = request.getParameterMap().entrySet().iterator();
iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+ String name = (String) entry.getKey();
+ if (!destinationParameter.equals(name) &&
!topicParameter.equals(name) && !bodyParameter.equals(name)) {
+ Object value = entry.getValue();
+ if (value instanceof Object[]) {
+ Object[] array = (Object[]) value;
+ if (array.length == 1) {
+ value = array[0];
+ }
+ else {
+ log("Can't use property: " + name + " which is of
type: " + value.getClass().getName() + " value");
+ value = null;
+ for (int i = 0, size = array.length; i < size; i++) {
+ log("value[" + i + "] = " + array[i]);
+ }
+ }
+ }
+ if (value != null) {
+ message.setObjectProperty(name, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * @return the destination to use for the current request
+ */
+ protected Destination getDestination(WebClient client, HttpServletRequest
request) throws JMSException, NoDestinationSuppliedException {
+ String destinationName = request.getParameter(destinationParameter);
+ if (destinationName == null) {
+ if (defaultDestination == null) {
+ return getDestinationFromURI(client, request);
+ }
+ else {
+ return defaultDestination;
+ }
+ }
+
+ return getDestination(client, request, destinationName);
+ }
+
+ /**
+ * @return the destination to use for the current request using the
relative URI from
+ * where this servlet was invoked as the destination name
+ */
+ protected Destination getDestinationFromURI(WebClient client,
HttpServletRequest request) throws NoDestinationSuppliedException, JMSException
{
+ String uri = request.getPathInfo();
+ if (uri == null) {
+ throw new NoDestinationSuppliedException();
+ }
+ // replace URI separator with JMS destination separator
+ if (uri.startsWith("/")) {
+ uri = uri.substring(1);
+ }
+ uri = uri.replace('/', '.');
+ return getDestination(client, request, uri);
+ }
+
+ /**
+ * @return the Destination object for the given destination name
+ */
+ protected Destination getDestination(WebClient client, HttpServletRequest
request, String destinationName) throws JMSException {
+ if (isTopic(request)) {
+ return client.getSession().createTopic(destinationName);
+ }
+ else {
+ return client.getSession().createQueue(destinationName);
+ }
+ }
+
+ /**
+ * @return true if the current request is for a topic destination, else
false if its for a queue
+ */
+ protected boolean isTopic
+ (HttpServletRequest
+ request) {
+ boolean aTopic = defaultTopicFlag;
+ String aTopicText = request.getParameter(topicParameter);
+ if (aTopicText != null) {
+ aTopic = asBoolean(aTopicText);
+ }
+ return aTopic;
+ }
+
+ protected long asLong(String name) {
+ return Long.parseLong(name);
+ }
+
+ /**
+ * @return the text that was posted to the servlet which is used as the
body
+ * of the message to be sent
+ */
+ protected String getPostedMessageBody(HttpServletRequest request) throws
IOException {
+ String answer = request.getParameter(bodyParameter);
+ if (answer == null) {
+ // lets read the message body instead
+ BufferedReader reader = request.getReader();
+ StringBuffer buffer = new StringBuffer();
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ buffer.append(line);
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+ return answer;
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.web;
+
+import javax.servlet.ServletException;
+
+/**
+ * Exception thrown if there was no destination available
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NoDestinationSuppliedException extends ServletException {
+
+ public NoDestinationSuppliedException() {
+ super("Could not perform the JMS operation as no Destination was
supplied");
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,138 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.web;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Hashtable;
+import java.util.Map;
+
+/**
+ * A servlet which will publish dummy market data prices
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class PortfolioPublishServlet extends MessageServletSupport {
+
+ private static final int maxDeltaPercent = 1;
+ private static final Map lastPrices = new Hashtable();
+ private boolean ricoStyle = true;
+
+
+ public void init() throws ServletException {
+ super.init();
+
+ ricoStyle = asBoolean(getServletConfig().getInitParameter("rico"),
true);
+ }
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse
response) throws ServletException, IOException {
+ PrintWriter out = response.getWriter();
+ String[] stocks = request.getParameterValues("stocks");
+ if (stocks == null || stocks.length == 0) {
+ out.println("<html><body>No <b>stocks</b> query parameter
specified. Cannot publish market data</body></html>");
+ }
+ else {
+ Integer
total=(Integer)request.getSession(true).getAttribute("total");
+ if (total==null)
+ total=new Integer(0);
+
+
+ int count = getNumberOfMessages(request);
+ total=new Integer(total.intValue()+count);
+ request.getSession().setAttribute("total",total);
+
+ try {
+ WebClient client = getWebClient(request);
+ for (int i = 0; i < count; i++) {
+ sendMessage(client, stocks);
+ }
+ out.print("<html><head><meta http-equiv='refresh' content='");
+ String refreshRate = request.getParameter("refresh");
+ if (refreshRate == null || refreshRate.length() == 0) {
+ refreshRate = "1";
+ }
+ out.print(refreshRate);
+ out.println("'/></head>");
+ out.println("<body>Published <b>" + count + "</b> of "+total+
" price messages. Refresh = "+refreshRate+"s");
+ out.println("</body></html>");
+
+ }
+ catch (JMSException e) {
+ out.println("<html><body>Failed sending price messages due to
<b>" + e + "</b></body></html>");
+ log("Failed to send message: " + e, e);
+ }
+ }
+ }
+
+ protected void sendMessage(WebClient client, String[] stocks) throws
JMSException {
+ Session session = client.getSession();
+
+ int idx = 0;
+ while (true) {
+ idx = (int) Math.round(stocks.length * Math.random());
+ if (idx < stocks.length) {
+ break;
+ }
+ }
+ String stock = stocks[idx];
+ Destination destination = session.createTopic("STOCKS." + stock);
+ String stockText = createStockText(stock);
+ log("Sending: " + stockText + " on destination: " + destination);
+ Message message = session.createTextMessage(stockText);
+ client.send(destination, message);
+ }
+
+ protected String createStockText(String stock) {
+ Double value = (Double) lastPrices.get(stock);
+ if (value == null) {
+ value = new Double(Math.random() * 100);
+ }
+
+ // lets mutate the value by some percentage
+ double oldPrice = value.doubleValue();
+ value = new Double(mutatePrice(oldPrice));
+ lastPrices.put(stock, value);
+ double price = value.doubleValue();
+
+ double offer = price * 1.001;
+
+ String movement = (price > oldPrice) ? "up" : "down";
+ return "<price stock='" + stock + "' bid='" + price + "' offer='" +
offer + "' movement='" + movement + "'/>";
+ }
+
+ protected double mutatePrice(double price) {
+ double percentChange = (2 * Math.random() * maxDeltaPercent) -
maxDeltaPercent;
+
+ return price * (100 + percentChange) / 100;
+ }
+
+ protected int getNumberOfMessages(HttpServletRequest request) {
+ String name = request.getParameter("count");
+ if (name != null) {
+ return Integer.parseInt(name);
+ }
+ return 1;
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,117 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.web;
+
+import org.activemq.broker.BrokerService;
+import org.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.Resource;
+import org.springframework.web.context.support.ServletContextResource;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
+/**
+ * Used to configure and instance of ActiveMQ <tt>BrokerService</tt> using
+ * ActiveMQ/Spring's xml configuration. <p/> The configuration file is
specified
+ * via the context init parameter <tt>brokerURI</tt>, typically: <code>
+ * <context-param>
+ * <param-name>brokerURI</param-name>
+ * <param-value>/WEB-INF/activemq.xml</param-value>
+ * </context-param>
+ * </code>
+ *
+ * As a a default, if a <tt>brokerURI</tt> is not specified it will look up
+ * for <tt>activemq.xml</tt>
+ *
+ * @version $Revision: 1.1 $
+ */
+public class SpringBrokerContextListener implements ServletContextListener {
+
+ /** broker uri context parameter name: <tt>brokerURI</tt> */
+ public static final String INIT_PARAM_BROKER_URI = "brokerURI";
+
+ /** the broker container instance */
+ private BrokerService brokerContainer;
+
+ /**
+ * Set the broker container to be used by this listener
+ *
+ * @param container
+ * the container to be used.
+ */
+ protected void setBrokerService(BrokerService container) {
+ this.brokerContainer = container;
+ }
+
+ /**
+ * Return the broker container.
+ */
+ protected BrokerService getBrokerService() {
+ return this.brokerContainer;
+ }
+
+ public void contextInitialized(ServletContextEvent event) {
+ ServletContext context = event.getServletContext();
+ context.log("Creating ActiveMQ Broker...");
+ brokerContainer = createBroker(context);
+
+ context.log("Starting ActiveMQ Broker");
+ try {
+ brokerContainer.start();
+
+ context.log("Started ActiveMQ Broker");
+ }
+ catch (Exception e) {
+ context.log("Failed to start ActiveMQ broker: " + e, e);
+ }
+ }
+
+ public void contextDestroyed(ServletContextEvent event) {
+ ServletContext context = event.getServletContext();
+ if (brokerContainer != null) {
+ try {
+ brokerContainer.stop();
+ }
+ catch (Exception e) {
+ context.log("Failed to stop the ActiveMQ Broker: " + e, e);
+ }
+ brokerContainer = null;
+ }
+ }
+
+ /**
+ * Factory method to create a new ActiveMQ Broker
+ */
+ protected BrokerService createBroker(ServletContext context) {
+ String brokerURI = context.getInitParameter(INIT_PARAM_BROKER_URI);
+ if (brokerURI == null) {
+ brokerURI = "activemq.xml";
+ }
+ context.log("Loading ActiveMQ Broker configuration from: " +
brokerURI);
+ Resource resource = new ServletContextResource(context, brokerURI);
+ BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+ try {
+ factory.afterPropertiesSet();
+ }
+ catch (Exception e) {
+ context.log("Failed to create broker: " + e, e);
+ }
+ return factory.getBroker();
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,257 @@
+/**
+ *
+ * Copyright 2004 Protique Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.activemq.web;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpSessionActivationListener;
+import javax.servlet.http.HttpSessionEvent;
+
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.ActiveMQSession;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
+
+/**
+ * Represents a messaging client used from inside a web container
+ * typically stored inside a HttpSession
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class WebClient implements HttpSessionActivationListener,
Externalizable {
+ public static final String webClientAttribute = "org.activemq.webclient";
+ public static final String connectionFactoryAttribute =
"org.activemq.connectionFactory";
+ public static final String queueConsumersAttribute =
"org.activemq.queueConsumers";
+ public static final String brokerUrlInitParam = "org.activemq.brokerURL";
+ public static final String embeddedBrokerInitParam =
"org.activemq.embeddedBroker";
+
+ private static final Log log = LogFactory.getLog(WebClient.class);
+
+ private static transient ConnectionFactory factory;
+ private static transient Map queueConsumers;
+
+ private transient ServletContext context;
+ private transient ActiveMQConnection connection;
+ private transient ActiveMQSession session;
+ private transient MessageProducer producer;
+ private transient Map topicConsumers = new ConcurrentHashMap();
+ private int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+ private final Semaphore semaphore = new Semaphore(1);
+
+
+ /**
+ * @return the web client for the current HTTP session or null if there is
not a web client created yet
+ */
+ public static WebClient getWebClient(HttpSession session) {
+ return (WebClient) session.getAttribute(webClientAttribute);
+ }
+
+
+ public static void initContext(ServletContext context) {
+ factory = initConnectionFactory(context);
+ if (factory == null) {
+ log.warn("No ConnectionFactory available in the ServletContext
for: " + connectionFactoryAttribute);
+ factory = new ActiveMQConnectionFactory("vm://localhost");
+ context.setAttribute(connectionFactoryAttribute, factory);
+ }
+ queueConsumers = initQueueConsumers(context);
+ }
+
+ /**
+ * Only called by serialization
+ */
+ public WebClient() {
+ }
+
+ public WebClient(ServletContext context) {
+ this.context = context;
+ initContext(context);
+ }
+
+
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+
+ public void start() throws JMSException {
+ }
+
+ public void stop() throws JMSException {
+ System.out.println("Closing the WebClient!!! " + this);
+
+ try {
+ connection.close();
+ }
+ finally {
+ producer = null;
+ session = null;
+ connection = null;
+ topicConsumers.clear();
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ topicConsumers = new HashMap();
+ }
+
+ public void send(Destination destination, Message message) throws
JMSException {
+ if (producer == null) {
+ producer = getSession().createProducer(null);
+ producer.setDeliveryMode(deliveryMode );
+ }
+ producer.send(destination, message);
+ if (log.isDebugEnabled()) {
+ log.debug("Sent! to destination: " + destination + " message: " +
message);
+ }
+ }
+
+ public Session getSession() throws JMSException {
+ if (session == null) {
+ session = createSession();
+ }
+ return session;
+ }
+
+ public ActiveMQConnection getConnection() throws JMSException {
+ if (connection == null) {
+ connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+ }
+ return connection;
+ }
+
+ public void sessionWillPassivate(HttpSessionEvent event) {
+ try {
+ stop();
+ }
+ catch (JMSException e) {
+ log.warn("Could not close connection: " + e, e);
+ }
+ }
+
+ public void sessionDidActivate(HttpSessionEvent event) {
+ // lets update the connection factory from the servlet context
+ context = event.getSession().getServletContext();
+ initContext(context);
+ }
+
+ public static Map initQueueConsumers(ServletContext context) {
+ Map answer = (Map) context.getAttribute(queueConsumersAttribute);
+ if (answer == null) {
+ answer = new HashMap();
+ context.setAttribute(queueConsumersAttribute, answer);
+ }
+ return answer;
+ }
+
+
+ public static ConnectionFactory initConnectionFactory(ServletContext
servletContext) {
+ ConnectionFactory connectionFactory = (ConnectionFactory)
servletContext.getAttribute(connectionFactoryAttribute);
+ if (connectionFactory == null) {
+ String brokerURL = (String)
servletContext.getInitParameter(brokerUrlInitParam);
+
+ servletContext.log("Value of: " + brokerUrlInitParam + " is: " +
brokerURL);
+
+ if (brokerURL == null) {
+ brokerURL = "vm://localhost";
+ }
+
+ boolean embeddedBroker =
MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
+ servletContext.log("Use embedded broker: " + embeddedBroker);
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerURL);
+ factory.setUseEmbeddedBroker(embeddedBroker);
+
+ connectionFactory = factory;
+ servletContext.setAttribute(connectionFactoryAttribute,
connectionFactory);
+ }
+ return connectionFactory;
+ }
+
+ public synchronized MessageConsumer getConsumer(Destination destination)
throws JMSException {
+ if (destination instanceof Topic) {
+ MessageConsumer consumer = (MessageConsumer)
topicConsumers.get(destination);
+ if (consumer == null) {
+ consumer = getSession().createConsumer(destination);
+ topicConsumers.put(destination, consumer);
+ }
+ return consumer;
+ }
+ else {
+ synchronized (queueConsumers) {
+ SessionConsumerPair pair = (SessionConsumerPair)
queueConsumers.get(destination);
+ if (pair == null) {
+ pair = createSessionConsumerPair(destination);
+ queueConsumers.put(destination, pair);
+ }
+ return pair.consumer;
+ }
+ }
+ }
+
+ protected ActiveMQSession createSession() throws JMSException {
+ return (ActiveMQSession) getConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected SessionConsumerPair createSessionConsumerPair(Destination
destination) throws JMSException {
+ SessionConsumerPair answer = new SessionConsumerPair();
+ answer.session = createSession();
+ answer.consumer = answer.session.createConsumer(destination);
+ return answer;
+ }
+
+ protected static class SessionConsumerPair {
+ public Session session;
+ public MessageConsumer consumer;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+}
Added:
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html?rev=357943&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
(added)
+++
incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
Tue Dec 20 01:54:54 2005
@@ -0,0 +1,13 @@
+<html>
+<head>
+</head>
+<body>
+
+<p>
+ Web Connectors so that messages can be sent via HTTP POST or read via
+ HTTP POST or GET as well as support for web streaming to we browser or
+ JavaScript clients.
+</p>
+
+</body>
+</html>