https://issues.apache.org/jira/browse/AMQ-5220
Fixes empty message bodies from responses to statistics plugin queries over the STOMP transport. This closes #41 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fe09b748 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fe09b748 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fe09b748 Branch: refs/heads/activemq-5.10.x Commit: fe09b7482c98dab25086a879cd2891a62f8c3b20 Parents: fb3e96a Author: Timothy Bish <[email protected]> Authored: Thu Aug 7 14:18:40 2014 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Dec 17 19:23:55 2014 -0500 ---------------------------------------------------------------------- .../transport/stomp/FrameTranslator.java | 5 +- .../transport/stomp/JmsFrameTranslator.java | 184 ++++++++++--------- .../transport/stomp/ProtocolConverter.java | 15 +- .../apache/activemq/transport/stomp/Stomp.java | 5 + .../transport/stomp/StompAdvisoryTest.java | 58 ++++++ .../transport/stomp/StompTestSupport.java | 5 + 6 files changed, 174 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index d37d364..7496472 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -27,12 +27,13 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; /** - * Implementations of this interface are used to map back and forth from Stomp + * Implementations of this interface are used to map back and forth from STOMP * to ActiveMQ. There are several standard mappings which are semantically the * same, the inner class, Helper, provides functions to copy those properties * from one to the other */ public interface FrameTranslator { + ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException; StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException; @@ -142,7 +143,7 @@ public interface FrameTranslator { msg.setPersistent("true".equals(o)); } - // Stomp specific headers + // STOMP specific headers headers.remove(Stomp.Headers.RECEIPT_REQUESTED); // Since we take the rest of the header and put them in properties which could then http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java index 6ae68fc..3525b23 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.stomp; +import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage; +import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame; + import java.io.IOException; import java.io.Serializable; import java.io.StringReader; @@ -33,6 +36,9 @@ import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.DataStructure; +import org.apache.activemq.transport.stomp.Stomp.Headers; +import org.apache.activemq.transport.stomp.Stomp.Responses; +import org.apache.activemq.transport.stomp.Stomp.Transformations; import org.codehaus.jettison.mapped.Configuration; import org.fusesource.hawtbuf.UTF8Buffer; @@ -49,133 +55,129 @@ import com.thoughtworks.xstream.io.xml.xppdom.XppFactory; /** * Frame translator implementation that uses XStream to convert messages to and * from XML and JSON - * - * @author <a href="mailto:[email protected]">Dejan Bosanac</a> */ -public class JmsFrameTranslator extends LegacyFrameTranslator implements - BrokerContextAware { +public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware { XStream xStream = null; BrokerContext brokerContext; @Override - public ActiveMQMessage convertFrame(ProtocolConverter converter, - StompFrame command) throws JMSException, ProtocolException { + public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { Map<String, String> headers = command.getHeaders(); ActiveMQMessage msg; - String transformation = headers.get(Stomp.Headers.TRANSFORMATION); - if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { + String transformation = headers.get(Headers.TRANSFORMATION); + if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) { msg = super.convertFrame(converter, command); } else { HierarchicalStreamReader in; try { String text = new String(command.getContent(), "UTF-8"); - switch (Stomp.Transformations.getValue(transformation)) { - case JMS_OBJECT_XML: - in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); - msg = createObjectMessage(in); - break; - case JMS_OBJECT_JSON: - in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); - msg = createObjectMessage(in); - break; - case JMS_MAP_XML: - in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); - msg = createMapMessage(in); - break; - case JMS_MAP_JSON: - in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); - msg = createMapMessage(in); - break; - default: - throw new Exception("Unkown transformation: " + transformation); + switch (Transformations.getValue(transformation)) { + case JMS_OBJECT_XML: + in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); + msg = createObjectMessage(in); + break; + case JMS_OBJECT_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createObjectMessage(in); + break; + case JMS_MAP_XML: + in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); + msg = createMapMessage(in); + break; + case JMS_MAP_JSON: + in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); + msg = createMapMessage(in); + break; + default: + throw new Exception("Unkown transformation: " + transformation); } } catch (Throwable e) { - command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); + command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage()); msg = super.convertFrame(converter, command); } } - FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); + + copyStandardHeadersFromFrameToMessage(converter, command, msg, this); return msg; } @Override - public StompFrame convertMessage(ProtocolConverter converter, - ActiveMQMessage message) throws IOException, JMSException { + public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { + + StompFrame command = new StompFrame(); + command.setAction(Responses.MESSAGE); + Map<String, String> headers = new HashMap<String, String>(25); + command.setHeaders(headers); + + copyStandardHeadersFromMessageToFrame(converter, message, command, this); + + String transformation = headers.get(Headers.TRANSFORMATION); if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map<String, String> headers = new HashMap<String, String>(25); - command.setHeaders(headers); - - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); - - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString()); + + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString()); + } + + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); } ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); - command.setContent(marshall(msg.getObject(), - headers.get(Stomp.Headers.TRANSFORMATION)) - .getBytes("UTF-8")); - return command; + command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map<String, String> headers = new HashMap<String, String>(25); - command.setHeaders(headers); - - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); - - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString()); + + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString()); + } + + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); } ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); - command.setContent(marshall((Serializable)msg.getContentMap(), - headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8")); - return command; - } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && - AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { - - StompFrame command = new StompFrame(); - command.setAction(Stomp.Responses.MESSAGE); - Map<String, String> headers = new HashMap<String, String>(25); - command.setHeaders(headers); - - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( - converter, message, command, this); - - if (!headers.containsKey(Stomp.Headers.TRANSFORMATION)) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); + + } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { + + if (Transformations.JMS_XML.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString()); + } else if (Transformations.JMS_JSON.equals(transformation)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); } - if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); - } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { - headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); + if (!headers.containsKey(Headers.TRANSFORMATION)) { + headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); } - String body = marshallAdvisory(message.getDataStructure(), - headers.get(Stomp.Headers.TRANSFORMATION)); + String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION)); command.setContent(body.getBytes("UTF-8")); - return command; + } else { - return super.convertMessage(converter, message); + command = super.convertMessage(converter, message); } + + return command; } /** - * Marshalls the Object to a string using XML or JSON encoding + * Marshal the Object to a string using XML or JSON encoding + * + * @param object + * the object to marshal + * @param transformation + * the transformation to apply to the object. + * + * @returns the marshaled form of the given object, in JSON or XML. + * + * @throws JMSException if an error occurs during the marshal operation. */ protected String marshall(Serializable object, String transformation) throws JMSException { StringWriter buffer = new StringWriter(); @@ -199,7 +201,7 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements @SuppressWarnings("unchecked") protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); - Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in); + Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in); for (String key : map.keySet()) { mapMsg.setObject(key, map.get(key)); } @@ -256,8 +258,9 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements xstream.ignoreUnknownElements(); } - // For any object whose elements contains an UTF8Buffer instance instead of a String - // type we map it to String both in and out such that we don't marshal UTF8Buffers out + // For any object whose elements contains an UTF8Buffer instance instead + // of a String type we map it to String both in and out such that we don't + // marshal UTF8Buffers out xstream.registerConverter(new AbstractSingleValueConverter() { @Override @@ -283,14 +286,17 @@ public class JmsFrameTranslator extends LegacyFrameTranslator implements } @Override - public BrokerContext getBrokerContext() { + public BrokerContext getBrokerContext() { return this.brokerContext; } /** * Return an Advisory message as a JSON formatted string + * * @param ds - * @return + * the DataStructure instance that is being marshaled. + * + * @return the JSON marshaled form of the given DataStructure instance. */ protected String marshallAdvisory(final DataStructure ds) { XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 0ed08e4..edefb15 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -204,17 +204,16 @@ public class ProtocolConverter { } protected FrameTranslator findTranslator(String header) { - return findTranslator(header, null); + return findTranslator(header, null, false); } - protected FrameTranslator findTranslator(String header, ActiveMQDestination destination) { + protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { FrameTranslator translator = frameTranslator; try { if (header != null) { - translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER - .newInstance(header); + translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); } else { - if (destination != null && AdvisorySupport.isAdvisoryTopic(destination)) { + if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { translator = new JmsFrameTranslator(); } } @@ -230,7 +229,7 @@ public class ProtocolConverter { } /** - * Convert a stomp command + * Convert a STOMP command * * @param command */ @@ -894,7 +893,9 @@ public class ProtocolConverter { if (ignoreTransformation == true) { return frameTranslator.convertMessage(this, message); } else { - return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination()).convertMessage(this, message); + FrameTranslator translator = findTranslator( + message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); + return translator.convertMessage(this, message); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index a66b5ee..767e947 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -176,10 +176,15 @@ public interface Stomp { JMS_ADVISORY_XML, JMS_ADVISORY_JSON; + @Override public String toString() { return name().replaceAll("_", "-").toLowerCase(Locale.ENGLISH); } + public boolean equals(String value) { + return toString().equals(value); + } + public static Transformations getValue(String value) { return valueOf(value.replaceAll("-", "_").toUpperCase(Locale.ENGLISH)); } http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java index 10d09b0..cc78308 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java @@ -22,29 +22,41 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.List; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.plugin.StatisticsBrokerPlugin; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StompAdvisoryTest extends StompTestSupport { + static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; + private static final Logger LOG = LoggerFactory.getLogger(StompAdvisoryTest.class); protected ActiveMQConnection connection; @Override + protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception { + plugins.add(new StatisticsBrokerPlugin()); + } + + @Override protected void applyBrokerPolicies() throws Exception { PolicyEntry policy = new PolicyEntry(); @@ -269,4 +281,50 @@ public class StompAdvisoryTest extends StompTestSupport { c.stop(); c.close(); } + + @Test + public void testStatisticsAdvisory() throws Exception { + Connection c = cf.createConnection("system", "manager"); + c.start(); + final Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic replyTo = session.createTopic("stats"); + + // Dummy Queue used to later gather statistics. + final ActiveMQQueue testQueue = new ActiveMQQueue("queueToBeTestedForStats"); + final MessageProducer producer = session.createProducer(null); + Message mess = session.createTextMessage("test"); + producer.send(testQueue, mess); + + // Create a request for Queue statistics + Thread child = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + Queue query = session.createQueue(STATS_DESTINATION_PREFIX + testQueue.getQueueName()); + Message msg = session.createMessage(); + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + child.start(); + + // Attempt to gather the statistics response from the previous request. + stompConnection.connect("system", "manager"); + stompConnection.subscribe("/topic/" + replyTo.getTopicName(), Stomp.Headers.Subscribe.AckModeValues.AUTO); + stompConnection.begin("TX"); + StompFrame f = stompConnection.receive(5000); + stompConnection.commit("TX"); + + LOG.debug(f.toString()); + assertEquals(f.getAction(),"MESSAGE"); + assertTrue("Should have a body", f.getBody().length() > 0); + assertTrue("Should contains memoryUsage stats", f.getBody().contains("memoryUsage")); + + c.stop(); + c.close(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fe09b748/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java index 3cf1356..e763552 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java @@ -146,6 +146,8 @@ public class StompTestSupport { plugins.add(configureAuthentication()); } + addAdditionalPlugins(plugins); + if (!plugins.isEmpty()) { BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; brokerService.setPlugins(plugins.toArray(array)); @@ -172,6 +174,9 @@ public class StompTestSupport { brokerService.setJobSchedulerStore(jobStore); } + protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception { + } + protected BrokerPlugin configureAuthentication() throws Exception { List<AuthenticationUser> users = new ArrayList<AuthenticationUser>(); users.add(new AuthenticationUser("system", "manager", "users,admins"));
