Author: gtully
Date: Tue Oct 18 17:31:27 2011
New Revision: 1185765
URL: http://svn.apache.org/viewvc?rev=1185765&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3542 - further simplification of
network bridge start, remove latch wait states, add duplex test variant
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Oct 18 17:31:27 2011
@@ -534,7 +534,7 @@ public class BrokerService implements Se
getBroker().brokerServiceStarted();
startedLatch.countDown();
} catch (Exception e) {
- LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: "
+ e, e);
+ LOG.error("Failed to start ActiveMQ JMS Message Broker (" +
getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
try {
if (!stopped.get()) {
stop();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
Tue Oct 18 17:31:27 2011
@@ -18,14 +18,8 @@ package org.apache.activemq.network;
import java.io.IOException;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Endpoint;
-import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,71 +34,13 @@ import org.slf4j.LoggerFactory;
public class CompositeDemandForwardingBridge extends
DemandForwardingBridgeSupport {
private static final Logger LOG =
LoggerFactory.getLogger(CompositeDemandForwardingBridge.class);
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
- protected Object brokerInfoMutex = new Object();
-
public CompositeDemandForwardingBridge(NetworkBridgeConfiguration
configuration, Transport localBroker,
Transport remoteBroker) {
super(configuration, localBroker, remoteBroker);
remoteBrokerName = remoteBroker.toString();
- remoteBrokerNameKnownLatch.countDown();
- }
-
- protected void serviceRemoteBrokerInfo(Command command) throws IOException
{
- synchronized (brokerInfoMutex) {
- BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
- BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId();
-
- // lets associate the incoming endpoint with a broker ID so we can
- // refer to it later
- Endpoint from = command.getFrom();
- if (from == null) {
- LOG.warn("Incoming command does not have a from endpoint: " +
command);
- } else {
- from.setBrokerInfo(remoteBrokerInfo);
- }
- if (localBrokerId != null) {
- if (localBrokerId.equals(remoteBrokerId)) {
- LOG.info("Disconnecting loop back connection.");
- // waitStarted();
- ServiceSupport.dispose(this);
- }
- }
- if (!disposed.get()) {
- triggerLocalStartBridge();
- }
- }
- }
-
- protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws
IOException {
- info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),
getFromBrokerId(info)));
- }
-
- /**
- * Returns the broker ID that the command came from
- */
- protected BrokerId getFromBrokerId(Command command) throws IOException {
- BrokerId answer = null;
- Endpoint from = command.getFrom();
- if (from == null) {
- LOG.warn("Incoming command does not have a from endpoint: " +
command);
- } else {
- answer = from.getBrokerId();
- }
- if (answer != null) {
- return answer;
- } else {
- throw new IOException("No broker ID is available for endpoint: " +
from + " from command: "
- + command);
- }
}
protected void serviceLocalBrokerInfo(Command command) throws
InterruptedException {
// TODO is there much we can do here?
}
-
- protected BrokerId[] getRemoteBrokerPath() {
- return remoteBrokerPath;
- }
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Tue Oct 18 17:31:27 2011
@@ -16,15 +16,7 @@
*/
package org.apache.activemq.network;
-import java.io.IOException;
-
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,58 +30,8 @@ import org.slf4j.LoggerFactory;
public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
private static final Logger LOG =
LoggerFactory.getLogger(DemandForwardingBridge.class);
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
- protected Object brokerInfoMutex = new Object();
- protected BrokerId remoteBrokerId;
-
public DemandForwardingBridge(NetworkBridgeConfiguration configuration,
Transport localBroker,
Transport remoteBroker) {
super(configuration, localBroker, remoteBroker);
}
-
- protected void serviceRemoteBrokerInfo(Command command) throws IOException
{
- synchronized (brokerInfoMutex) {
- BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
- remoteBrokerId = remoteBrokerInfo.getBrokerId();
- remoteBrokerPath[0] = remoteBrokerId;
- remoteBrokerName = remoteBrokerInfo.getBrokerName();
- if (localBrokerId != null) {
- if (localBrokerId.equals(remoteBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + "
disconnecting remote loop back connection: " + remoteBrokerName);
- }
- ServiceSupport.dispose(this);
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("counting down remoteBrokerNameKnownLatch with: " +
command);
- }
- remoteBrokerNameKnownLatch.countDown();
- }
- }
-
- protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
- info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),
getRemoteBrokerPath()));
- }
-
- protected void serviceLocalBrokerInfo(Command command) throws
InterruptedException {
- synchronized (brokerInfoMutex) {
- localBrokerId = ((BrokerInfo)command).getBrokerId();
- localBrokerPath[0] = localBrokerId;
- localBrokerIdKnownLatch.countDown();
- if (remoteBrokerId != null) {
- if (remoteBrokerId.equals(localBrokerId)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + "
disconnecting local loop back connection.");
- }
- waitStarted();
- ServiceSupport.dispose(this);
- }
- }
- }
- }
-
- protected BrokerId[] getRemoteBrokerPath() {
- return remoteBrokerPath;
- }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Tue Oct 18 17:31:27 2011
@@ -36,7 +36,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -74,7 +73,6 @@ import org.apache.activemq.transport.Res
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFilter;
-import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
@@ -114,12 +112,14 @@ public abstract class DemandForwardingBr
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
- protected CountDownLatch remoteBrokerNameKnownLatch = new
CountDownLatch(1);
- protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new
AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected NetworkBridgeFilterFactory filterFactory;
+ protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
+ protected Object brokerInfoMutex = new Object();
+ protected BrokerId remoteBrokerId;
+
final AtomicLong enqueueCounter = new AtomicLong();
final AtomicLong dequeueCounter = new AtomicLong();
@@ -222,14 +222,12 @@ public abstract class DemandForwardingBr
});
}
- protected void startLocalBridge() throws Throwable {
+ private void startLocalBridge() throws Throwable {
if (localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " starting local
Bridge, localBroker=" + localBroker);
}
- remoteBrokerNameKnownLatch.await();
-
if (!disposed.get()) {
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new
ConnectionId(idGenerator.generateId()));
@@ -277,7 +275,7 @@ public abstract class DemandForwardingBr
protected void startRemoteBridge() throws Exception {
if (remoteBridgeStarted.compareAndSet(false, true)) {
if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() + " starting remote
Bridge, localBroker=" + localBroker);
+ LOG.trace(configuration.getBrokerName() + " starting remote
Bridge, remoteBroker=" + remoteBroker);
}
synchronized (this) {
if (!isCreatedByDuplex()) {
@@ -291,7 +289,6 @@ public abstract class DemandForwardingBr
IntrospectionSupport.getProperties(configuration, props,
null);
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
- localBrokerIdKnownLatch.await();
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
}
@@ -322,9 +319,6 @@ public abstract class DemandForwardingBr
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
- if (!disposed.get()) {
- triggerLocalStartBridge();
- }
}
}
}
@@ -372,7 +366,6 @@ public abstract class DemandForwardingBr
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
LOG.info(configuration.getBrokerName() + " bridge to " +
remoteBrokerName + " stopped");
- remoteBrokerNameKnownLatch.countDown();
}
}
@@ -1161,7 +1154,6 @@ public abstract class DemandForwardingBr
protected void waitStarted() throws InterruptedException {
startedLatch.await();
- localBrokerIdKnownLatch.await();
}
protected void clearDownSubscriptions() {
@@ -1184,13 +1176,47 @@ public abstract class DemandForwardingBr
return filterFactory.create(info, getRemoteBrokerPath(),
configuration.getNetworkTTL() );
}
- protected abstract void serviceLocalBrokerInfo(Command command) throws
InterruptedException;
+ protected void serviceLocalBrokerInfo(Command command) throws
InterruptedException {
+ synchronized (brokerInfoMutex) {
+ if (remoteBrokerId != null) {
+ if (remoteBrokerId.equals(localBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + "
disconnecting local loop back connection for: " + remoteBrokerName + ", with
id:" + remoteBrokerId);
+ }
+ waitStarted();
+ ServiceSupport.dispose(this);
+ }
+ }
+ }
+ }
- protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info)
throws IOException;
+ protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws
IOException {
+ info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),
getRemoteBrokerPath()));
+ }
- protected abstract void serviceRemoteBrokerInfo(Command command) throws
IOException;
+ protected void serviceRemoteBrokerInfo(Command command) throws IOException
{
+ synchronized (brokerInfoMutex) {
+ BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
+ remoteBrokerId = remoteBrokerInfo.getBrokerId();
+ remoteBrokerPath[0] = remoteBrokerId;
+ remoteBrokerName = remoteBrokerInfo.getBrokerName();
+ if (localBrokerId != null) {
+ if (localBrokerId.equals(remoteBrokerId)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + "
disconnecting remote loop back connection for: " + remoteBrokerName + ", with
id:" + remoteBrokerId);
+ }
+ ServiceSupport.dispose(this);
+ }
+ }
+ if (!disposed.get()) {
+ triggerLocalStartBridge();
+ }
+ }
+ }
- protected abstract BrokerId[] getRemoteBrokerPath();
+ protected BrokerId[] getRemoteBrokerPath() {
+ return remoteBrokerPath;
+ }
public void setNetworkBridgeListener(NetworkBridgeListener listener) {
this.networkBridgeListener = listener;
@@ -1233,6 +1259,8 @@ public abstract class DemandForwardingBr
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
+ this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
+ localBrokerPath[0] = localBrokerId;
}
public void setMbeanObjectName(ObjectName objectName) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Tue Oct 18 17:31:27 2011
@@ -96,10 +96,16 @@ public class DiscoveryNetworkConnector e
return;
}
}
- if ( localURI.equals(uri) || (connectionFilter != null &&
!connectionFilter.connectTo(uri))) {
+ if (localURI.equals(uri)) {
LOG.debug("not connecting loopback: " + uri);
return;
}
+
+ if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
+ LOG.debug("connectionFilter disallows connection to: " + uri);
+ return;
+ }
+
URI connectUri = uri;
try {
connectUri = URISupport.applyParameters(connectUri,
parameters, DISCOVERED_OPTION_PREFIX);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1185765&r1=1185764&r2=1185765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Tue Oct 18 17:31:27 2011
@@ -278,7 +278,6 @@ public class FailoverStaticNetworkTest {
brokerC.start();
assertTrue("all props applied a second time",
networkConnectorProps.isEmpty());
- //Thread.sleep(4000);
doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC);
@@ -321,8 +320,20 @@ public class FailoverStaticNetworkTest {
@Test
public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws
Exception {
+ doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
+ }
+
+ @Test
+ public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws
Exception {
+ HashMap<String, String> networkConnectorProps = new HashMap<String,
String>();
+ networkConnectorProps.put("duplex", "true");
+
+
doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps);
+ }
+
+ public void
doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String>
networkConnectorProps) throws Exception {
- brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"});
+ brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"},
networkConnectorProps);
brokerB.start();
final AtomicBoolean done = new AtomicBoolean(false);
@@ -380,7 +391,7 @@ public class FailoverStaticNetworkTest {
}
});
- for (int i=0; i<10; i++) {
+ for (int i=0; i<4; i++) {
BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1);
LOG.info("iteration: " + i + ", using: " +
currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
currentMaster.waitUntilStarted();
@@ -392,7 +403,7 @@ public class FailoverStaticNetworkTest {
currentMaster.waitUntilStopped();
}
- done.set(false);
+ done.set(true);
LOG.info("all done");
executorService.shutdownNow();
}