http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml new file mode 100644 index 0000000..511d6d1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/activemq.xml @@ -0,0 +1,51 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- Allows us to use system properties as variables in this configuration file --> + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> + <property name="location" value="org/apache/activemq/memory/usage.properties"/> + </bean> + + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${name}" persistent="false"> + + <!-- Use the following to set the broker memory limit --> + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage limit="${limit}" percentUsageMinDelta="${delta}"/> + </memoryUsage> + <storeUsage> + <storeUsage limit="1 gb" name="foo"/> + </storeUsage> + <tempUsage> + <tempUsage limit="100 mb"/> + </tempUsage> + </systemUsage> + </systemUsage> + + </broker> + +</beans> +<!-- END SNIPPET: example -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java new file mode 100644 index 0000000..e5823d8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/DummyMessage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.memory.buffer; + +import org.apache.activemq.command.ActiveMQMessage; + +/** + * A message implementation which is useful for testing as we can spoof its size + * + * + */ +public class DummyMessage extends ActiveMQMessage { + + private int size; + + public DummyMessage(int size) { + this.size = size; + } + + public int getSize() { + return size; + } + + public String toString() { + return "DummyMessage[id=" + getMessageId() + " size=" + size + "]"; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java new file mode 100644 index 0000000..ea8f0a6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.memory.buffer; + +import junit.framework.TestCase; + +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.memory.buffer.MessageBuffer; +import org.apache.activemq.memory.buffer.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public abstract class MemoryBufferTestSupport extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(MemoryBufferTestSupport.class); + + protected MessageBuffer buffer = createMessageBuffer(); + protected MessageQueue qA = buffer.createMessageQueue(); + protected MessageQueue qB = buffer.createMessageQueue(); + protected MessageQueue qC = buffer.createMessageQueue(); + protected int messageCount; + + protected abstract MessageBuffer createMessageBuffer(); + + protected void setUp() throws Exception { + buffer = createMessageBuffer(); + qA = buffer.createMessageQueue(); + qB = buffer.createMessageQueue(); + qC = buffer.createMessageQueue(); + } + + protected void dump() { + LOG.info("Dumping current state"); + dumpQueue(qA, "A"); + dumpQueue(qB, "B"); + dumpQueue(qC, "C"); + } + + protected void dumpQueue(MessageQueue queue, String name) { + LOG.info(" " + name + " = " + queue.getList()); + } + + protected ActiveMQMessage createMessage(int size) throws Exception { + DummyMessage answer = new DummyMessage(size); + answer.setIntProperty("counter", ++messageCount); + answer.setJMSMessageID("" + messageCount); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java new file mode 100644 index 0000000..2e771f2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.memory.buffer; + +import org.apache.activemq.memory.buffer.MessageBuffer; +import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer; + + +/** + * + * + */ +public class OrderBasedMemoryBufferTest extends MemoryBufferTestSupport { + + public void testSizeWorks() throws Exception { + qA.add(createMessage(10)); + qB.add(createMessage(10)); + qB.add(createMessage(10)); + qC.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 10, qA.getSize()); + assertEquals("qB", 20, qB.getSize()); + assertEquals("qC", 10, qC.getSize()); + + qC.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 0, qA.getSize()); + assertEquals("qB", 20, qB.getSize()); + assertEquals("qC", 20, qC.getSize()); + + qB.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 0, qA.getSize()); + assertEquals("qB", 20, qB.getSize()); + assertEquals("qC", 20, qC.getSize()); + + qA.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 10, qA.getSize()); + assertEquals("qB", 10, qB.getSize()); + assertEquals("qC", 20, qC.getSize()); + } + + + protected MessageBuffer createMessageBuffer() { + return new OrderBasedMessageBuffer(40); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java new file mode 100644 index 0000000..ad02821 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.memory.buffer; + +import org.apache.activemq.memory.buffer.MessageBuffer; +import org.apache.activemq.memory.buffer.SizeBasedMessageBuffer; + + +/** + * + * + */ +public class SizeBasedMessageBufferTest extends MemoryBufferTestSupport { + + public void testSizeWorks() throws Exception { + qA.add(createMessage(10)); + qB.add(createMessage(10)); + qB.add(createMessage(10)); + qC.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 10, qA.getSize()); + assertEquals("qB", 20, qB.getSize()); + assertEquals("qC", 10, qC.getSize()); + + // now lets force an eviction + qC.add(createMessage(10)); + + dump(); + + assertEquals("buffer size", 40, buffer.getSize()); + assertEquals("qA", 10, qA.getSize()); + assertEquals("qB", 10, qB.getSize()); + assertEquals("qC", 20, qC.getSize()); + } + + + protected MessageBuffer createMessageBuffer() { + return new SizeBasedMessageBuffer(40); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties new file mode 100644 index 0000000..b5d33d1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/memory/usage.properties @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +limit=1k +name=test-broker +delta=34 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java new file mode 100644 index 0000000..9f085b4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java @@ -0,0 +1,629 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageNotWriteableException; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.management.ObjectName; + +import javax.management.openmbean.CompositeData; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerTestSupport; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.util.Wait; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class duplicates most of the functionality in {@link NetworkTestSupport} + * and {@link BrokerTestSupport} because more control was needed over how brokers + * and connectors are created. Also, this test asserts message counts via JMX on + * each broker. + */ +public class BrokerNetworkWithStuckMessagesTest { + + private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class); + + private BrokerService localBroker; + private BrokerService remoteBroker; + private BrokerService secondRemoteBroker; + private DemandForwardingBridge bridge; + + protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>(); + protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>(); + + protected TransportConnector connector; + protected TransportConnector remoteConnector; + protected TransportConnector secondRemoteConnector; + + protected long idGenerator; + protected int msgIdGenerator; + protected int tempDestGenerator; + protected int maxWait = 4000; + protected String queueName = "TEST"; + + protected String amqDomain = "org.apache.activemq"; + + @Before + public void setUp() throws Exception { + + // For those who want visual confirmation: + // Uncomment the following to enable JMX support on a port number to use + // Jconsole to view each broker. You will need to add some calls to + // Thread.sleep() to be able to actually slow things down so that you + // can manually see JMX attrs. +// System.setProperty("com.sun.management.jmxremote", ""); +// System.setProperty("com.sun.management.jmxremote.port", "1099"); +// System.setProperty("com.sun.management.jmxremote.authenticate", "false"); +// System.setProperty("com.sun.management.jmxremote.ssl", "false"); + + // Create the local broker + createBroker(); + // Create the remote broker + createRemoteBroker(); + + // Remove the activemq-data directory from the creation of the remote broker + FileUtils.deleteDirectory(new File("activemq-data")); + + // Create a network bridge between the local and remote brokers so that + // demand-based forwarding can take place + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + config.setBrokerName("local"); + config.setDispatchAsync(false); + config.setDuplex(true); + + Transport localTransport = createTransport(); + Transport remoteTransport = createRemoteTransport(); + + // Create a network bridge between the two brokers + bridge = new DemandForwardingBridge(config, localTransport, remoteTransport); + bridge.setBrokerService(localBroker); + bridge.start(); + + + // introduce a second broker/bridge on remote that should not get any messages because of networkTtl=1 + // local <-> remote <-> secondRemote + createSecondRemoteBroker(); + config = new NetworkBridgeConfiguration(); + config.setBrokerName("remote"); + config.setDuplex(true); + + localTransport = createRemoteTransport(); + remoteTransport = createSecondRemoteTransport(); + + // Create a network bridge between the two brokers + bridge = new DemandForwardingBridge(config, localTransport, remoteTransport); + bridge.setBrokerService(remoteBroker); + bridge.start(); + + waitForBridgeFormation(); + } + + protected void waitForBridgeFormation() throws Exception { + for (final BrokerService broker : brokers.values()) { + if (!broker.getNetworkConnectors().isEmpty()) { + // Max wait here is 30 secs + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty(); + }}); + } + } + } + + @After + public void tearDown() throws Exception { + bridge.stop(); + localBroker.stop(); + remoteBroker.stop(); + secondRemoteBroker.stop(); + } + + @Test(timeout=120000) + public void testBrokerNetworkWithStuckMessages() throws Exception { + + int sendNumMessages = 10; + int receiveNumMessages = 5; + + // Create a producer + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + // Create a destination on the local broker + ActiveMQDestination destinationInfo1 = null; + + // Send a 10 messages to the local broker + for (int i = 0; i < sendNumMessages; ++i) { + destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE); + connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT)); + } + + // Ensure that there are 10 messages on the local broker + Object[] messages = browseQueueWithJmx(localBroker); + assertEquals(sendNumMessages, messages.length); + + // Create a synchronous consumer on the remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + ActiveMQDestination destinationInfo2 = + createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE); + final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2); + connection2.send(consumerInfo2); + + // Consume 5 of the messages from the remote broker and ack them. + for (int i = 0; i < receiveNumMessages; ++i) { + Message message1 = receiveMessage(connection2, 20000); + assertNotNull(message1); + LOG.info("on remote, got: " + message1.getMessageId()); + connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); + assertTrue("JMSActiveMQBrokerPath property present and correct", + ((ActiveMQMessage)message1).getStringProperty(ActiveMQMessage.BROKER_PATH_PROPERTY).contains(localBroker.getBroker().getBrokerId().toString())); + } + + // Ensure that there are zero messages on the local broker. This tells + // us that those messages have been prefetched to the remote broker + // where the demand exists. + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 0 == result.length; + } + }); + messages = browseQueueWithJmx(localBroker); + assertEquals(0, messages.length); + + // try and pull the messages from remote, should be denied b/c on networkTtl + LOG.info("creating demand on second remote..."); + StubConnection connection3 = createSecondRemoteConnection(); + ConnectionInfo connectionInfo3 = createConnectionInfo(); + SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); + connection3.send(connectionInfo3); + connection3.send(sessionInfo3); + ActiveMQDestination destinationInfo3 = + createDestinationInfo(connection3, connectionInfo3, ActiveMQDestination.QUEUE_TYPE); + final ConsumerInfo consumerInfoS3 = createConsumerInfo(sessionInfo3, destinationInfo3); + connection3.send(consumerInfoS3); + + Message messageExceedingTtl = receiveMessage(connection3, 5000); + if (messageExceedingTtl != null) { + LOG.error("got message on Second remote: " + messageExceedingTtl); + connection3.send(createAck(consumerInfoS3, messageExceedingTtl, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); + } + + LOG.info("Closing consumer on remote"); + // Close the consumer on the remote broker + connection2.send(consumerInfo2.createRemoveCommand()); + // also close connection etc.. so messages get dropped from the local consumer q + connection2.send(connectionInfo2.createRemoveCommand()); + + // There should now be 5 messages stuck on the remote broker + assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(remoteBroker); + return 5 == result.length; + } + })); + messages = browseQueueWithJmx(remoteBroker); + assertEquals(5, messages.length); + + assertTrue("can see broker path property", + ((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString())); + + LOG.info("Messages now stuck on remote"); + + // receive again on the origin broker + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1); + connection1.send(consumerInfo1); + LOG.info("create local consumer: " + consumerInfo1); + + Message message1 = receiveMessage(connection1, 20000); + assertNotNull("Expect to get a replay as remote consumer is gone", message1); + connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); + LOG.info("acked one message on origin, waiting for all messages to percolate back"); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 4 == result.length; + } + }); + messages = browseQueueWithJmx(localBroker); + assertEquals(4, messages.length); + + LOG.info("checking for messages on remote again"); + // messages won't migrate back again till consumer closes + connection2 = createRemoteConnection(); + connectionInfo2 = createConnectionInfo(); + sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2); + connection2.send(consumerInfo3); + message1 = receiveMessage(connection2, 20000); + assertNull("Messages have migrated back: " + message1, message1); + + // Consume the last 4 messages from the local broker and ack them just + // to clean up the queue. + int counter = 1; + for (; counter < receiveNumMessages; counter++) { + message1 = receiveMessage(connection1); + LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() : " null")); + connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); + } + // Ensure that 5 messages were received + assertEquals(receiveNumMessages, counter); + + // verify all messages consumed + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(remoteBroker); + return 0 == result.length; + } + }); + messages = browseQueueWithJmx(remoteBroker); + assertEquals(0, messages.length); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] result = browseQueueWithJmx(localBroker); + return 0 == result.length; + } + }); + messages = browseQueueWithJmx(localBroker); + assertEquals(0, messages.length); + + // Close the consumer on the remote broker + connection2.send(consumerInfo3.createRemoveCommand()); + + connection1.stop(); + connection2.stop(); + connection3.stop(); + } + + protected BrokerService createBroker() throws Exception { + localBroker = new BrokerService(); + localBroker.setBrokerName("localhost"); + localBroker.setUseJmx(true); + localBroker.setPersistenceAdapter(null); + localBroker.setPersistent(false); + connector = createConnector(); + localBroker.addConnector(connector); + configureBroker(localBroker); + localBroker.start(); + localBroker.waitUntilStarted(); + + localBroker.getManagementContext().setConnectorPort(2221); + + brokers.put(localBroker.getBrokerName(), localBroker); + + return localBroker; + } + + private void configureBroker(BrokerService broker) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory(); + filterFactory.setReplayWhenNoConsumers(true); + defaultEntry.setNetworkBridgeFilterFactory(filterFactory); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + } + + protected BrokerService createRemoteBroker() throws Exception { + remoteBroker = new BrokerService(); + remoteBroker.setBrokerName("remotehost"); + remoteBroker.setUseJmx(true); + remoteBroker.setPersistenceAdapter(null); + remoteBroker.setPersistent(false); + remoteConnector = createRemoteConnector(); + remoteBroker.addConnector(remoteConnector); + configureBroker(remoteBroker); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + + remoteBroker.getManagementContext().setConnectorPort(2222); + + brokers.put(remoteBroker.getBrokerName(), remoteBroker); + + return remoteBroker; + } + + protected BrokerService createSecondRemoteBroker() throws Exception { + secondRemoteBroker = new BrokerService(); + secondRemoteBroker.setBrokerName("secondRemotehost"); + secondRemoteBroker.setUseJmx(false); + secondRemoteBroker.setPersistenceAdapter(null); + secondRemoteBroker.setPersistent(false); + secondRemoteConnector = createSecondRemoteConnector(); + secondRemoteBroker.addConnector(secondRemoteConnector); + configureBroker(secondRemoteBroker); + secondRemoteBroker.start(); + secondRemoteBroker.waitUntilStarted(); + + brokers.put(secondRemoteBroker.getBrokerName(), secondRemoteBroker); + + return secondRemoteBroker; + } + + protected Transport createTransport() throws Exception { + Transport transport = TransportFactory.connect(connector.getServer().getConnectURI()); + return transport; + } + + protected Transport createRemoteTransport() throws Exception { + Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI()); + return transport; + } + + protected Transport createSecondRemoteTransport() throws Exception { + Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI()); + return transport; + } + + protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException { + return new TransportConnector(TransportFactory.bind(new URI(getLocalURI()))); + } + + protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException { + return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI()))); + } + + protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException { + return new TransportConnector(TransportFactory.bind(new URI(getSecondRemoteURI()))); + } + + protected String getRemoteURI() { + return "vm://remotehost"; + } + + protected String getSecondRemoteURI() { + return "vm://secondRemotehost"; + } + + protected String getLocalURI() { + return "vm://localhost"; + } + + protected StubConnection createConnection() throws Exception { + Transport transport = TransportFactory.connect(connector.getServer().getConnectURI()); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected StubConnection createRemoteConnection() throws Exception { + Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI()); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected StubConnection createSecondRemoteConnection() throws Exception { + Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI()); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + @SuppressWarnings({ "unchecked", "unused" }) + private Object[] browseQueueWithJms(BrokerService broker) throws Exception { + Object[] messages = null; + Connection connection = null; + Session session = null; + + try { + URI brokerUri = connector.getUri(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString()); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(queueName); + QueueBrowser browser = session.createBrowser(destination); + List<Message> list = new ArrayList<Message>(); + for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) { + list.add(enumn.nextElement()); + } + messages = list.toArray(); + } + finally { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + LOG.info("+Browsed with JMS: " + messages.length); + + return messages; + } + + private Object[] browseQueueWithJmx(BrokerService broker) throws Exception { + Hashtable<String, String> params = new Hashtable<String, String>(); + params.put("brokerName", broker.getBrokerName()); + params.put("type", "Broker"); + params.put("destinationType", "Queue"); + params.put("destinationName", queueName); + ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params); + + ManagementContext mgmtCtx = broker.getManagementContext(); + QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true); + + Object[] messages = queueView.browse(); + + LOG.info("+Browsed with JMX: " + messages.length); + + return messages; + } + + protected ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; + } + + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) { + DestinationInfo info = new DestinationInfo(); + info.setConnectionId(connectionInfo.getConnectionId()); + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); + info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType)); + return info; + } + + protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception { + if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) { + DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType); + connection.send(info); + return info.getDestination(); + } else { + return ActiveMQDestination.createDestination(queueName, destinationType); + } + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); + return message; + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + try { + message.setText("Test Message Payload."); + } catch (MessageNotWriteableException e) { + } + return message; + } + + protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { + MessageAck ack = new MessageAck(); + ack.setAckType(ackType); + ack.setConsumerId(consumerInfo.getConsumerId()); + ack.setDestination(msg.getDestination()); + ack.setLastMessageId(msg.getMessageId()); + ack.setMessageCount(count); + return ack; + } + + public Message receiveMessage(StubConnection connection) throws InterruptedException { + return receiveMessage(connection, maxWait); + } + + public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException { + while (true) { + Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS); + + if (o == null) { + return null; + } + if (o instanceof MessageDispatch) { + + MessageDispatch dispatch = (MessageDispatch)o; + if (dispatch.getMessage() == null) { + return null; + } + dispatch.setMessage(dispatch.getMessage().copy()); + dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); + return dispatch.getMessage(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java new file mode 100644 index 0000000..68681c6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.io.File; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.net.ServerSocketFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.command.Response; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + +/** + * + * @author x22koe + */ +public class CheckDuplicateMessagesOnDuplexTest { + + private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class); + private BrokerService localBroker; + private BrokerService remoteBroker; + private ActiveMQConnectionFactory localFactory; + private ActiveMQConnectionFactory remoteFactory; + private Session localSession; + private MessageConsumer consumer; + private Session remoteSession; + private MessageProducer producer; + private Connection remoteConnection; + private Connection localConnection; + private DebugTransportFilter debugTransportFilter; + private boolean useLevelDB = false; + + public CheckDuplicateMessagesOnDuplexTest() { + } + + @BeforeClass + public static void setUpClass() { + } + + @AfterClass + public static void tearDownClass() { + } + + @Before + public void setUp() { + } + + @After + public void tearDown() { + } + + @Test + public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception { + createBrokers(); + localBroker.deleteAllMessages(); + remoteBroker.deleteAllMessages(); + startBrokers(); + openConnections(); + + Thread.sleep(1000); + log.info("\n\n==============================================\nsend hello1\n"); + + // simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE + debugTransportFilter.closeOnResponse = true; + + producer.send(remoteSession.createTextMessage("hello1")); + Message msg = consumer.receive(30000); + + assertNotNull("expected hello1", msg); + assertEquals("hello1", ((TextMessage) msg).getText()); + + Thread.sleep(1000); + log.info("\n\n------------------------------------------\nsend hello2\n"); + + producer.send(remoteSession.createTextMessage("hello2")); + msg = consumer.receive(30000); + + assertNotNull("expected hello2", msg); + assertEquals("hello2", ((TextMessage) msg).getText()); + + closeLocalConnection(); + + Thread.sleep(1000); + log.info("\n\n------------------------------------------\nsend hello3\n"); + + openLocalConnection(); + + Thread.sleep(1000); + + producer.send(remoteSession.createTextMessage("hello3")); + msg = consumer.receive(30000); + + assertNotNull("expected hello3", msg); + assertEquals("hello3", ((TextMessage) msg).getText()); + + Thread.sleep(1000); + log.info("\n\n==============================================\n\n"); + + closeConnections(); + stopBrokers(); + + // restart the local broker, which should be empty + + Thread.sleep(1000); + log.info("\n\n##############################################\n\n"); + + createLocalBroker(); + startLocalBroker(); + openLocalConnection(); + + // this should not return the "hello1" message + msg = consumer.receive(1000); + + closeLocalConnection(); + stopLocalBroker(); + + assertNull(msg); + } + + private void createBrokers() throws Exception { + createLocalBroker(); + createRemoteBroker(); + } + + private void createLocalBroker() throws Exception { + localBroker = new BrokerService(); + localBroker.setBrokerName("LOCAL"); + localBroker.setUseJmx(true); + localBroker.setSchedulePeriodForDestinationPurge(5000); + ManagementContext managementContext = new ManagementContext(); + managementContext.setCreateConnector(false); + localBroker.setManagementContext(managementContext); + PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local"); + localBroker.setPersistenceAdapter(persistenceAdapter); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + DebugTransportFactory tf = new DebugTransportFactory(); + TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539")); + TransportConnector transportConnector = new TransportConnector(transport); + transportConnector.setName("tc"); + transportConnector.setAuditNetworkProducers(true); + transportConnectors.add(transportConnector); + localBroker.setTransportConnectors(transportConnectors); + } + + private void createRemoteBroker() throws Exception { + remoteBroker = new BrokerService(); + remoteBroker.setBrokerName("REMOTE"); + remoteBroker.setUseJmx(true); + remoteBroker.setSchedulePeriodForDestinationPurge(5000); + ManagementContext managementContext = new ManagementContext(); + managementContext.setCreateConnector(false); + remoteBroker.setManagementContext(managementContext); + PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote"); + remoteBroker.setPersistenceAdapter(persistenceAdapter); + List<NetworkConnector> networkConnectors = new ArrayList<NetworkConnector>(); + DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector(); + networkConnector.setName("to local"); + // set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger + networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)")); + networkConnector.setDuplex(true); + //networkConnector.setNetworkTTL(5); + //networkConnector.setDynamicOnly(true); + networkConnector.setAlwaysSyncSend(true); + networkConnector.setDecreaseNetworkConsumerPriority(false); + networkConnector.setPrefetchSize(1); + networkConnector.setCheckDuplicateMessagesOnDuplex(true); + networkConnectors.add(networkConnector); + remoteBroker.setNetworkConnectors(networkConnectors); + } + + private void startBrokers() throws Exception { + startLocalBroker(); + startRemoteBroker(); + } + + private void startLocalBroker() throws Exception { + localBroker.start(); + localBroker.waitUntilStarted(); + } + + private void startRemoteBroker() throws Exception { + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + private void openConnections() throws JMSException { + openLocalConnection(); + openRemoteConnection(); + } + + private void openLocalConnection() throws JMSException { + localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI()); + //localFactory.setSendAcksAsync(false); + localConnection = localFactory.createConnection(); + localConnection.start(); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = localSession.createConsumer(localSession.createQueue("testqueue")); + } + + private void openRemoteConnection() throws JMSException { + remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI()); + //remoteFactory.setSendAcksAsync(false); + remoteConnection = remoteFactory.createConnection(); + remoteConnection.start(); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = remoteSession.createProducer(remoteSession.createQueue("testqueue")); + } + + private void closeConnections() throws JMSException { + closeLocalConnection(); + closeRemoteConnection(); + } + + private void closeLocalConnection() throws JMSException { + localConnection.close(); + } + + private void closeRemoteConnection() throws JMSException { + remoteConnection.close(); + } + + private void stopBrokers() throws Exception { + stopRemoteBroker(); + stopLocalBroker(); + } + + private void stopLocalBroker() throws Exception { + localBroker.stop(); + localBroker.waitUntilStopped(); + } + + private void stopRemoteBroker() throws Exception { + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + } + + private PersistenceAdapter persistanceAdapterFactory(String path) { + if (useLevelDB) { + return persistanceAdapterFactory_LevelDB(path); + } else { + return persistanceAdapterFactory_KahaDB(path); + } + } + + private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter(); + kahaDBPersistenceAdapter.setDirectory(new File(path)); + kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true); + kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true); + kahaDBPersistenceAdapter.setChecksumJournalFiles(true); + return kahaDBPersistenceAdapter; + } + + private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) { + LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter(); + levelDBPersistenceAdapter.setDirectory(new File(path)); + return levelDBPersistenceAdapter; + } + + private class DebugTransportFactory extends NIOTransportFactory { + + @Override + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) + throws IOException, URISyntaxException { + return new DebugTransportServer(this, location, serverSocketFactory); + } + } + + private class DebugTransportServer extends TcpTransportServer { + + public DebugTransportServer(TcpTransportFactory transportFactory, URI location, + ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory); + } + + @Override + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + Transport transport; + transport = new NIOTransport(format, socket); + debugTransportFilter = new DebugTransportFilter(transport); + return debugTransportFilter; + } + } + + private class DebugTransportFilter extends TransportFilter { + + boolean closeOnResponse = false; + + public DebugTransportFilter(Transport next) { + super(next); + } + + @Override + public void oneway(Object command) throws IOException { + if (closeOnResponse && command instanceof Response) { + closeOnResponse = false; + log.warn("\n\nclosing connection before response is sent\n\n"); + try { + ((NIOTransport) next).stop(); + } catch (Exception ex) { + log.error("couldn't stop niotransport", ex); + } + // don't send response + return; + } + super.oneway(command); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java new file mode 100644 index 0000000..58af6dc --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.StreamMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.util.Wait; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +public class CompressionOverNetworkTest { + + protected static final int RECEIVE_TIMEOUT_MILLS = 10000; + protected static final int MESSAGE_COUNT = 10; + private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class); + + protected AbstractApplicationContext context; + protected Connection localConnection; + protected Connection remoteConnection; + protected BrokerService localBroker; + protected BrokerService remoteBroker; + protected Session localSession; + protected Session remoteSession; + protected ActiveMQDestination included; + + @Test + public void testCompressedOverCompressedNetwork() throws Exception { + + ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection; + localAmqConnection.setUseCompression(true); + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + StringBuilder payload = new StringBuilder("test-"); + for (int i = 0; i < 100; ++i) { + payload.append(UUID.randomUUID().toString()); + } + + Message test = localSession.createTextMessage(payload.toString()); + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQTextMessage message = (ActiveMQTextMessage) msg; + assertTrue(message.isCompressed()); + assertEquals(payload.toString(), message.getText()); + } + + @Test + public void testTextMessageCompression() throws Exception { + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + StringBuilder payload = new StringBuilder("test-"); + for (int i = 0; i < 100; ++i) { + payload.append(UUID.randomUUID().toString()); + } + + Message test = localSession.createTextMessage(payload.toString()); + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQTextMessage message = (ActiveMQTextMessage) msg; + assertTrue(message.isCompressed()); + assertEquals(payload.toString(), message.getText()); + } + + @Test + public void testBytesMessageCompression() throws Exception { + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + StringBuilder payload = new StringBuilder("test-"); + for (int i = 0; i < 100; ++i) { + payload.append(UUID.randomUUID().toString()); + } + + byte[] bytes = payload.toString().getBytes("UTF-8"); + + BytesMessage test = localSession.createBytesMessage(); + test.writeBytes(bytes); + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg; + assertTrue(message.isCompressed()); + assertTrue(message.getContent().getLength() < bytes.length); + + byte[] result = new byte[bytes.length]; + assertEquals(bytes.length, message.readBytes(result)); + assertEquals(-1, message.readBytes(result)); + + for(int i = 0; i < bytes.length; ++i) { + assertEquals(bytes[i], result[i]); + } + } + + @Test + public void testStreamMessageCompression() throws Exception { + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + StreamMessage test = localSession.createStreamMessage(); + + for (int i = 0; i < 100; ++i) { + test.writeString("test string: " + i); + } + + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg; + assertTrue(message.isCompressed()); + + for (int i = 0; i < 100; ++i) { + assertEquals("test string: " + i, message.readString()); + } + } + + @Test + public void testMapMessageCompression() throws Exception { + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + MapMessage test = localSession.createMapMessage(); + + for (int i = 0; i < 100; ++i) { + test.setString(Integer.toString(i), "test string: " + i); + } + + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQMapMessage message = (ActiveMQMapMessage) msg; + assertTrue(message.isCompressed()); + + for (int i = 0; i < 100; ++i) { + assertEquals("test string: " + i, message.getString(Integer.toString(i))); + } + } + + @Test + public void testObjectMessageCompression() throws Exception { + + MessageConsumer consumer1 = remoteSession.createConsumer(included); + MessageProducer producer = localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + waitForConsumerRegistration(localBroker, 1, included); + + StringBuilder payload = new StringBuilder("test-"); + for (int i = 0; i < 100; ++i) { + payload.append(UUID.randomUUID().toString()); + } + + Message test = localSession.createObjectMessage(payload.toString()); + producer.send(test); + Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS); + assertNotNull(msg); + ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg; + assertTrue(message.isCompressed()); + assertEquals(payload.toString(), message.getObject()); + } + + private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception { + assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray(); + if (bridges.length > 0) { + LOG.info(brokerService + " bridges " + Arrays.toString(bridges)); + DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0]; + ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap(); + LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges); + if (!forwardingBridges.isEmpty()) { + for (DemandSubscription demandSubscription : forwardingBridges.values()) { + if (demandSubscription.getLocalInfo().getDestination().equals(destination)) { + LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size()); + return demandSubscription.size() >= min; + } + } + } + } + return false; + } + })); + } + + @Before + public void setUp() throws Exception { + doSetUp(true); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + protected void doTearDown() throws Exception { + localConnection.close(); + remoteConnection.close(); + localBroker.stop(); + remoteBroker.stop(); + } + + protected void doSetUp(boolean deleteAllMessages) throws Exception { + localBroker = createLocalBroker(); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + remoteBroker = createRemoteBroker(); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID("clientId"); + localConnection.start(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("clientId"); + remoteConnection.start(); + included = new ActiveMQTopic("include.test.bar"); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker.xml"; + } + + protected BrokerService createBroker(String uri) throws Exception { + Resource resource = new ClassPathResource(uri); + BrokerFactoryBean factory = new BrokerFactoryBean(resource); + resource = new ClassPathResource(uri); + factory = new BrokerFactoryBean(resource); + factory.afterPropertiesSet(); + BrokerService result = factory.getBroker(); + + for (NetworkConnector connector : result.getNetworkConnectors()) { + connector.setUseCompression(true); + } + + return result; + } + + protected BrokerService createLocalBroker() throws Exception { + return createBroker(getLocalBrokerURI()); + } + + protected BrokerService createRemoteBroker() throws Exception { + return createBroker(getRemoteBrokerURI()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java new file mode 100644 index 0000000..087ddd0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import junit.framework.Test; + +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + +import java.util.Arrays; + + +public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { + + private DemandForwardingBridge bridge; + + private StubConnection producerConnection; + + private ProducerInfo producerInfo; + + private StubConnection consumerConnection; + + private SessionInfo consumerSessionInfo; + + public void testWildcardOnExcludedDestination() throws Exception { + + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", + ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + + assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE); + assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE); + } + + public void testWildcardOnTwoExcludedDestination() throws Exception { + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), + ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST.X2", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + + assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE); + assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE); + assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE); + } + + + public void testWildcardOnDynamicallyIncludedDestination() throws Exception { + + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), + ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + + + assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE); + assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE); + } + + public void testDistinctTopicAndQueue() throws Exception { + + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(">", + ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + ">", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + + assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE); + assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE); + } + + public void testListOfExcludedDestinationWithWildcard() throws Exception { + + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE), + ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST.X1", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + + assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE); + assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE); + assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE); + } + + private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception, + InterruptedException { + + ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType); + + // Send the message to the local broker. + producerConnection.send(createMessage(producerInfo, destination, destinationType)); + + // Make sure the message was delivered via the remote. + Message m = createConsumerAndReceiveMessage(destination); + + assertNotNull(m); + } + + private void assertReceiveNoMessageOn(String destinationName, byte destinationType) throws Exception, + InterruptedException { + + ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, destinationType); + + // Send the message to the local broker. + producerConnection.send(createMessage(producerInfo, destination, destinationType)); + + // Make sure the message was delivered via the remote. + Message m = createConsumerAndReceiveMessage(destination); + assertNull(m); + } + + private Message createConsumerAndReceiveMessage(ActiveMQDestination destination) throws Exception { + // Now create remote consumer that should cause message to move to this + // remote consumer. + ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, destination); + consumerConnection.send(consumerInfo); + + Message m = receiveMessage(consumerConnection); + return m; + } + + protected void setUp() throws Exception { + super.setUp(); + + + producerConnection = createConnection(); + ConnectionInfo producerConnectionInfo = createConnectionInfo(); + SessionInfo producerSessionInfo = createSessionInfo(producerConnectionInfo); + producerInfo = createProducerInfo(producerSessionInfo); + producerConnection.send(producerConnectionInfo); + producerConnection.send(producerSessionInfo); + producerConnection.send(producerInfo); + + consumerConnection = createRemoteConnection(); + ConnectionInfo consumerConnectionInfo = createConnectionInfo(); + consumerSessionInfo = createSessionInfo(consumerConnectionInfo); + consumerConnection.send(consumerConnectionInfo); + consumerConnection.send(consumerSessionInfo); + } + + protected void tearDown() throws Exception { + bridge.stop(); + super.tearDown(); + } + + public static Test suite() { + return suite(DemandForwardingBridgeFilterTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public NetworkBridgeConfiguration getDefaultBridgeConfiguration() { + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + config.setBrokerName("local"); + config.setDispatchAsync(false); + return config; + } + + private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws Exception { + bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport()); + bridge.setBrokerService(broker); + bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()] + )); + bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray( + new ActiveMQDestination[configuration.getExcludedDestinations().size()] + )); + bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()] + )); + bridge.start(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java new file mode 100644 index 0000000..9794337 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; + +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.util.Wait; + +public class DemandForwardingBridgeTest extends NetworkTestSupport { + + public ActiveMQDestination destination; + public byte destinationType; + public int deliveryMode; + private DemandForwardingBridge bridge; + + public void initCombosForTestSendThenAddConsumer() { + addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testSendThenAddConsumer() throws Exception { + + // Start a producer on local broker + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + destination = createDestinationInfo(connection1, connectionInfo1, destinationType); + + // Start a consumer on a remote broker + final StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + // Send the message to the local broker. + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + + // Verify that the message stayed on the local broker. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + connection1.send(consumerInfo1); + Message m = receiveMessage(connection1); + assertNotNull(m); + // Close consumer to cause the message to rollback. + connection1.send(consumerInfo1.createRemoveCommand()); + + final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics(); + assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); + assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount()); + assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount()); + + // Now create remote consumer that should cause message to move to this + // remote consumer. + final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + connection2.request(consumerInfo2); + + // Make sure the message was delivered via the remote. + assertTrue("message was received", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message msg = receiveMessage(connection2); + if (msg != null) { + connection2.request(createAck(consumerInfo2, msg, 1, MessageAck.STANDARD_ACK_TYPE)); + return true; + } + + return false; + } + })); + + assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == destinationStatistics.getForwards().getCount(); + } + })); + + assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + } + + public void initCombosForTestAddConsumerThenSend() { + addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE), new Byte(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testAddConsumerThenSend() throws Exception { + + // Start a producer on local broker + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + destination = createDestinationInfo(connection1, connectionInfo1, destinationType); + + // Start a consumer on a remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); + connection2.send(consumerInfo); + + // Give demand forwarding bridge a chance to finish forwarding the + // subscriptions. + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + // Send the message to the local boker. + connection1.request(createMessage(producerInfo, destination, deliveryMode)); + // Make sure the message was delivered via the remote. + receiveMessage(connection2); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + config.setBrokerName("local"); + config.setDispatchAsync(false); + bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport()); + bridge.setBrokerService(broker); + bridge.start(); + } + + @Override + protected void tearDown() throws Exception { + bridge.stop(); + super.tearDown(); + } + + public static Test suite() { + return suite(DemandForwardingBridgeTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +}
