Repository: activemq Updated Branches: refs/heads/master 062b0c2c0 -> 310090904
https://issues.apache.org/jira/browse/AMQ-5830 - ensure duplex inbound connection sets network=true flag, fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31009090 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31009090 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31009090 Branch: refs/heads/master Commit: 3100909041b8cf114773f0d0bf60d8032732186f Parents: 062b0c2 Author: gtully <[email protected]> Authored: Tue Jun 9 11:29:20 2015 +0100 Committer: gtully <[email protected]> Committed: Tue Jun 9 12:19:49 2015 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 7 +- .../network/DemandForwardingBridgeSupport.java | 3 + .../activemq/network/NetworkBridgeFactory.java | 14 -- .../activemq/transport/vm/VMTransport.java | 9 - .../NetworkBridgeProducerFlowControlTest.java | 230 ++++++++++++++++++- 5 files changed, 232 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index a2a04a0..d285c74 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -404,11 +404,7 @@ public class BrokerService implements Service { */ public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { connector.setBrokerService(this); - URI uri = getVmConnectorURI(); - Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); - map.put("network", "true"); - uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); - connector.setLocalUri(uri); + connector.setLocalUri(getVmConnectorURI()); // Set a connection filter so that the connector does not establish loop // back connections. connector.setConnectionFilter(new ConnectionFilter() { @@ -2499,7 +2495,6 @@ public class BrokerService implements Service { this.slave = false; URI uri = getVmConnectorURI(); Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); - map.put("network", "true"); map.put("async", "false"); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 1b77e73..8ba1d98 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -475,6 +475,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (configuration.isDuplex()) { // separate in-bound channel for forwards so we don't // contend with out-bound dispatch on same connection + remoteBrokerInfo.setNetworkConnection(true); + duplexInboundLocalBroker.oneway(remoteBrokerInfo); + ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java index 41a9d9e..0e938ae 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java @@ -32,19 +32,6 @@ public final class NetworkBridgeFactory { private NetworkBridgeFactory() { } - - /** - * Create a network bridge - * - * @param config - * @param localTransport - * @param remoteTransport - * @return the NetworkBridge - */ - public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config, - Transport localTransport, Transport remoteTransport) { - return createBridge(config, localTransport, remoteTransport, null); - } /** * create a network bridge @@ -74,7 +61,6 @@ public final class NetworkBridgeFactory { public static Transport createLocalTransport(Broker broker) throws Exception { URI uri = broker.getVmConnectorURI(); HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); - map.put("network", "true"); map.put("async", "true"); map.put("create", "false"); // we don't want a vm connect during shutdown to trigger a broker create uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 6e6726d..92c9c51 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -49,7 +49,6 @@ public class VMTransport implements Transport, Task { protected VMTransport peer; protected TransportListener transportListener; protected boolean marshal; - protected boolean network; protected boolean async = true; protected int asyncQueueDepth = 2000; protected final URI location; @@ -358,14 +357,6 @@ public class VMTransport implements Transport, Task { this.marshal = marshal; } - public boolean isNetwork() { - return network; - } - - public void setNetwork(boolean network) { - this.network = network; - } - @Override public String toString() { return location + "#" + id; http://git-wip-us.apache.org/repos/asf/activemq/blob/31009090/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java index e950b7d..4e1501d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.usecases; +import java.io.IOException; import java.net.URI; import java.util.Vector; import java.util.concurrent.CountDownLatch; @@ -31,7 +32,10 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -242,7 +246,7 @@ public class NetworkBridgeProducerFlowControlTest extends // Verify the behaviour as described in the description of this class. if (networkIsAlwaysSendSync) { Assert - .assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10); + .assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 20); } else { Assert.assertEquals(persistentTestMessages, @@ -384,4 +388,226 @@ public class NetworkBridgeProducerFlowControlTest extends // Verify the behaviour as described in the description of this class. Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10); } -} \ No newline at end of file + + public void testSendFailIfNoSpaceReverseDoesNotBlockQueueNetwork() throws Exception { + final int NUM_MESSAGES = 100; + final long TEST_MESSAGE_SIZE = 1024; + final long SLOW_CONSUMER_DELAY_MILLIS = 100; + + final ActiveMQQueue slowDestination = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQQueue fastDestination = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + + + // Start a local and a remote broker. + BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0" + + ")?brokerName=broker0&persistent=false&useJmx=true")); + createBroker(new URI( + "broker:(tcp://localhost:0" + + ")?brokerName=broker1&persistent=false&useJmx=true")); + localBroker.getSystemUsage().setSendFailIfNoSpace(true); + + // Set a policy on the local broker that limits the maximum size of the + // slow shared queue. + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE); + PolicyMap policyMap = new PolicyMap(); + policyMap.put(slowDestination, policyEntry); + localBroker.setDestinationPolicy(policyMap); + + // Create an outbound bridge from the local broker to the remote broker. + // The bridge is configured with the remoteDispatchType enhancement. + NetworkConnector nc = bridgeBrokers("broker0", "broker1"); + nc.setAlwaysSyncSend(true); + nc.setPrefetchSize(1); + nc.setDuplex(true); + + startAllBrokers(); + waitForBridgeFormation(); + + // Start two asynchronous consumers on the local broker, one for each + // of the two shared queues, and keep track of how long it takes for + // each of the consumers to receive all the messages. + final CountDownLatch fastConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + final CountDownLatch slowConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + + final long startTimeMillis = System.currentTimeMillis(); + final AtomicLong fastConsumerTime = new AtomicLong(); + final AtomicLong slowConsumerTime = new AtomicLong(); + + Thread fastWaitThread = new Thread() { + @Override + public void run() { + try { + fastConsumerLatch.await(); + fastConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + Thread slowWaitThread = new Thread() { + @Override + public void run() { + try { + slowConsumerLatch.await(); + slowConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + fastWaitThread.start(); + slowWaitThread.start(); + + createConsumer("broker0", fastDestination, fastConsumerLatch); + MessageConsumer slowConsumer = createConsumer("broker0", + slowDestination, slowConsumerLatch); + MessageIdList messageIdList = brokers.get("broker0").consumers + .get(slowConsumer); + messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS); + + // Send the test messages to the local broker's shared queues. The + // messages are either persistent or non-persistent to demonstrate the + // difference between synchronous and asynchronous dispatch. + persistentDelivery = false; + sendMessages("broker1", fastDestination, NUM_MESSAGES); + sendMessages("broker1", slowDestination, NUM_MESSAGES); + + fastWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + slowWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + + assertTrue("no exceptions on the wait threads:" + exceptions, + exceptions.isEmpty()); + + LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get()); + LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get()); + + assertTrue("fast time set", fastConsumerTime.get() > 0); + assertTrue("slow time set", slowConsumerTime.get() > 0); + + // Verify the behaviour as described in the description of this class. + Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10); + } + + + /** + * create a duplex network bridge from broker0 to broker1 + * add a topic consumer on broker0 + * set the setSendFailIfNoSpace() on the local broker. + * create a SimpleDiscoveryAgent impl that tracks a network reconnect + * + * producer connects to broker1 and messages should be sent across the network to broker0 + * + * Ensure broker0 will not send the javax.jms.ResourceAllocationException (when broker0 runs out of space). + * If the javax.jms.ResourceAllocationException is sent across the wire it will force the network connector + * to shutdown + * + * + * @throws Exception + */ + + public void testDuplexSendFailIfNoSpaceDoesNotBlockNetwork() throws Exception { + + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQTopic destination = new ActiveMQTopic( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".duplexTest?consumer.prefetchSize=1"); + + final int NUM_MESSAGES = 100; + final long TEST_MESSAGE_SIZE = 1024; + final long SLOW_CONSUMER_DELAY_MILLIS = 100; + + // Start a local and a remote broker. + BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0" + + ")?brokerName=broker0&persistent=false&useJmx=true")); + + BrokerService remoteBroker = createBroker(new URI( + "broker:(tcp://localhost:0" + + ")?brokerName=broker1&persistent=false&useJmx=true")); + + localBroker.getSystemUsage().setSendFailIfNoSpace(true); + + // Set a policy on the remote broker that limits the maximum size of the + // slow shared queue. + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE); + PolicyMap policyMap = new PolicyMap(); + policyMap.put(destination, policyEntry); + localBroker.setDestinationPolicy(policyMap); + + // Create a duplex network bridge from the local broker to the remote broker + // create a SimpleDiscoveryAgent impl that tracks a reconnect + DiscoveryNetworkConnector discoveryNetworkConnector = (DiscoveryNetworkConnector)bridgeBrokers("broker0", "broker1"); + URI originURI = discoveryNetworkConnector.getUri(); + discoveryNetworkConnector.setAlwaysSyncSend(true); + discoveryNetworkConnector.setPrefetchSize(1); + discoveryNetworkConnector.setDuplex(true); + + DummySimpleDiscoveryAgent dummySimpleDiscoveryAgent = new DummySimpleDiscoveryAgent(); + dummySimpleDiscoveryAgent.setServices(originURI.toString().substring(8,originURI.toString().lastIndexOf(')'))); + + discoveryNetworkConnector.setDiscoveryAgent(dummySimpleDiscoveryAgent); + + startAllBrokers(); + waitForBridgeFormation(); + + + final CountDownLatch consumerLatch = new CountDownLatch( + NUM_MESSAGES); + + + //createConsumer("broker0", fastDestination, fastConsumerLatch); + + MessageConsumer consumer = createConsumer("broker0", + destination, consumerLatch); + + MessageIdList messageIdList = brokers.get("broker0").consumers + .get(consumer); + + messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS); + + // Send the test messages to the local broker's shared queues. The + // messages are either persistent or non-persistent to demonstrate the + // difference between synchronous and asynchronous dispatch. + persistentDelivery = false; + sendMessages("broker1", destination, NUM_MESSAGES); + + //wait for 5 seconds for the consumer to complete + consumerLatch.await(5, TimeUnit.SECONDS); + + assertFalse("dummySimpleDiscoveryAgent.serviceFail has been invoked - should not have been", + dummySimpleDiscoveryAgent.isServiceFailed); + + } + + /** + * When the network connector fails it records the failure and delegates to real SimpleDiscoveryAgent + */ + class DummySimpleDiscoveryAgent extends SimpleDiscoveryAgent { + + boolean isServiceFailed = false; + + public void serviceFailed(DiscoveryEvent devent) throws IOException { + + //should never get in here + LOG.info("!!!!! DummySimpleDiscoveryAgent.serviceFailed() invoked with event:"+devent+"!!!!!!"); + isServiceFailed = true; + super.serviceFailed(devent); + + } + + } +}
