https://issues.apache.org/jira/browse/AMQ-4852 - ensure clientId view connector mbean visible for duplex network connectors
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7eff195c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7eff195c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7eff195c Branch: refs/heads/activemq-5.9 Commit: 7eff195cb9c1b815453bec9250f43cb8d363c4ff Parents: 9518705 Author: gtully <[email protected]> Authored: Thu Nov 7 11:34:56 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 09:02:29 2014 -0400 ---------------------------------------------------------------------- .../network/DemandForwardingBridgeSupport.java | 8 +++++ .../broker/jmx/TransportConnectorMBeanTest.java | 37 ++++++++++++++++++++ .../network/DuplexNetworkMBeanTest.java | 4 +-- 3 files changed, 47 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7eff195c/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index ea61472..af10a94 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -119,6 +119,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private final AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; + private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); protected BrokerService brokerService = null; private ObjectName mbeanObjectName; private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); @@ -610,6 +611,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { switch (command.getDataStructureType()) { case ConnectionInfo.DATA_STRUCTURE_TYPE: + if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { + // end of initiating connection setup - propogate to initial connection to get mbean by clientid + duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); + } else { + localBroker.oneway(command); + } + break; case SessionInfo.DATA_STRUCTURE_TYPE: localBroker.oneway(command); break; http://git-wip-us.apache.org/repos/asf/activemq/blob/7eff195c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java index 310f112..6f55e3d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.jmx; +import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import java.net.Socket; @@ -26,7 +27,9 @@ import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -47,6 +50,40 @@ public class TransportConnectorMBeanTest { doVerifyRemoteAddressInMbeanName(false); } + @Test + public void verifyClientIdNetwork() throws Exception { + doVerifyClientIdNetwork(false); + } + + @Test + public void verifyClientIdDuplexNetwork() throws Exception { + doVerifyClientIdNetwork(true); + } + + private void doVerifyClientIdNetwork(boolean duplex) throws Exception { + createBroker(true); + + BrokerService networked = new BrokerService(); + networked.setBrokerName("networked"); + networked.setPersistent(false); + NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString()); + nc.setDuplex(duplex); + networked.start(); + + try { + assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Set<ObjectName> registeredMbeans = getRegisteredMbeans(); + return match("_outbound", registeredMbeans); + } + })); + + } finally { + networked.stop(); + } + } + private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception { createBroker(allowRemoteAddress); ActiveMQConnection connection = createConnection(); http://git-wip-us.apache.org/repos/asf/activemq/blob/7eff195c/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index cf02bb2..213d4ae 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -66,7 +66,7 @@ public class DuplexNetworkMBeanTest { networkedBroker.start(); assertEquals(1, countMbeans(networkedBroker, "networkBridge", 2000)); assertEquals(1, countMbeans(broker, "networkBridge", 2000)); - assertEquals(1, countMbeans(broker, "connectionName")); + assertEquals(2, countMbeans(broker, "connectionName")); } finally { networkedBroker.stop(); networkedBroker.waitUntilStopped(); @@ -100,7 +100,7 @@ public class DuplexNetworkMBeanTest { try { broker.start(); assertEquals(1, countMbeans(networkedBroker, "networkBridge", 5000)); - assertEquals("restart number: " + i, 1, countMbeans(broker, "connectionName", 10000)); + assertEquals("restart number: " + i, 2, countMbeans(broker, "connectionName", 10000)); } finally { broker.stop(); broker.waitUntilStopped();
