Repository: activemq-artemis Updated Branches: refs/heads/refactor-openwire fa50a9131 -> b6189fffa
Fixed some test failures Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b6189fff Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b6189fff Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b6189fff Branch: refs/heads/refactor-openwire Commit: b6189fffa8914b78f85c19e687ba4f58a7f68028 Parents: fa50a91 Author: Howard Gao <[email protected]> Authored: Thu Feb 18 20:52:32 2016 +0800 Committer: Howard Gao <[email protected]> Committed: Thu Feb 18 20:52:32 2016 +0800 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 7 +++++++ .../core/protocol/openwire/amq/AMQSession.java | 1 + .../artemiswrapper/ArtemisBrokerWrapper.java | 14 ++++++++++++++ .../org/apache/activemq/broker/BrokerTest.java | 19 +++++++++++++++---- 4 files changed, 37 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6189fff/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 2b863c1..6e8b07d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -400,6 +400,13 @@ public class OpenWireMessageConverter implements MessageConverter { coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); } coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); + + ActiveMQDestination origDest = messageSend.getOriginalDestination(); + if (origDest != null) { + ByteSequence origDestBytes = marshaller.marshal(origDest); + origDestBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); + } } private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6189fff/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 701c9ce..fe8d3c4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -255,6 +255,7 @@ public class AMQSession implements SessionCallback { ActiveMQDestination[] actualDestinations = null; if (destination.isComposite()) { actualDestinations = destination.getCompositeDestinations(); + messageSend.setOriginalDestination(destination); } else { actualDestinations = new ActiveMQDestination[]{destination}; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6189fff/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 3ad6072..5cb5048 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -29,9 +29,12 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; @@ -257,4 +260,15 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } } } + + public long getAMQueueMessageCount(String physicalName) { + long count = 0; + String qname = "jms.queue." + physicalName; + Binding binding = server.getPostOffice().getBinding(new SimpleString(qname)); + if (binding != null) { + QueueImpl q = (QueueImpl) binding.getBindable(); + count = q.getMessageCount(); + } + return count; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6189fff/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java index 1e83319..9f412a9 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -24,6 +24,7 @@ import javax.jms.DeliveryMode; import junit.framework.Test; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -454,9 +455,13 @@ public class BrokerTest extends BrokerTestSupport { // Commit the transaction. connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); + //due to async tx operations, we need some time for message count to go down + Thread.sleep(1000); + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); // The queue should now only have the remaining 2 messages - assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); + assertEquals(2, messageCount); } public void initCombosForTestConsumerCloseCausesRedelivery() { @@ -1463,11 +1468,17 @@ public class BrokerTest extends BrokerTestSupport { assertNotNull(m); assertEquals(m.getMessageId(), message1.getMessageId()); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 2); connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); + messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 2); connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 1); + //give some time for broker to count down + Thread.sleep(2000); + messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 1); }
