Author: gtully
Date: Wed Jul 28 10:43:27 2010
New Revision: 980014
URL: http://svn.apache.org/viewvc?rev=980014&view=rev
Log:
apply patch from https://issues.apache.org/activemq/browse/AMQ-2774
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
(with props)
activemq/trunk/activemq-core/src/test/resources/META-INF/services/
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Jul 28 10:43:27 2010
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@ import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
@@ -80,7 +82,6 @@ import org.apache.activemq.state.Consume
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -88,6 +89,7 @@ import org.apache.activemq.transaction.T
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
@@ -96,7 +98,7 @@ import org.apache.activemq.util.URISuppo
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import static org.apache.activemq.thread.DefaultThreadPools.*;
+import static
org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
/**
* @version $Revision: 1.8 $
*/
@@ -149,6 +151,7 @@ public class TransportConnection impleme
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new
SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new
ReentrantReadWriteLock();
+ private BrokerId duplexRemoteBrokerId;
/**
* @param connector
@@ -1178,6 +1181,20 @@ public class TransportConnection impleme
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
+ // We first look if existing network connection already exists
for the same broker Id
+ // It's possible in case of brief network fault to have this
transport connector side of the connection always active
+ // and the duplex network connector side wanting to open a new
one
+ // In this case, the old connection must be broken
+ BrokerId remoteBrokerId = info.getBrokerId();
+ setDuplexRemoteBrokerId(remoteBrokerId);
+ CopyOnWriteArrayList<TransportConnection> connections =
this.connector.getConnections();
+ for (Iterator<TransportConnection> iter =
connections.iterator(); iter.hasNext();) {
+ TransportConnection c = iter.next();
+ if ((c != this) &&
(remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) {
+ LOG.warn("An existing duplex active connection already
exists for this broker (" + remoteBrokerId + "). Stopping it.");
+ c.stop();
+ }
+ }
Properties properties =
MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new
NetworkBridgeConfiguration();
@@ -1198,6 +1215,9 @@ public class TransportConnection impleme
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " +
info.getBrokerName());
return null;
+ } catch (TransportDisposedIOException e) {
+ LOG.warn("Duplex Bridge back to " + info.getBrokerName() + "
was correctly stopped before it was correctly started.");
+ return null;
} catch (Exception e) {
LOG.error("Creating duplex network bridge", e);
}
@@ -1391,4 +1411,12 @@ public class TransportConnection impleme
protected synchronized TransportConnectionState
lookupConnectionState(ConnectionId connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId);
}
+
+ protected synchronized void setDuplexRemoteBrokerId(BrokerId
remoteBrokerId) {
+ this.duplexRemoteBrokerId = remoteBrokerId;
+ }
+
+ protected synchronized BrokerId getDuplexRemoteBrokerId() {
+ return this.duplexRemoteBrokerId;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Jul 28 10:43:27 2010
@@ -233,14 +233,19 @@ public abstract class DemandForwardingBr
// initiator side of duplex network
remoteBrokerNameKnownLatch.await();
}
- try {
- triggerRemoteStartBridge();
- } catch (IOException e) {
- LOG.warn("Caught exception from remote start", e);
- }
- NetworkBridgeListener l = this.networkBridgeListener;
- if (l != null) {
- l.onStart(this);
+ if (!disposed.get()) {
+ try {
+ triggerRemoteStartBridge();
+ } catch (IOException e) {
+ LOG.warn("Caught exception from remote start", e);
+ }
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l != null) {
+ l.onStart(this);
+ }
+ } else {
+ LOG.warn ("Bridge was disposed before the start() method was
fully executed.");
+ throw new TransportDisposedIOException();
}
}
}
@@ -285,30 +290,38 @@ public abstract class DemandForwardingBr
}
remoteBrokerNameKnownLatch.await();
- localConnectionInfo = new ConnectionInfo();
- localConnectionInfo.setConnectionId(new
ConnectionId(idGenerator.generateId()));
- localClientId = "NC_" + remoteBrokerName + "_inbound_" +
configuration.getBrokerName();
- localConnectionInfo.setClientId(localClientId);
- localConnectionInfo.setUserName(configuration.getUserName());
- localConnectionInfo.setPassword(configuration.getPassword());
- Transport originalTransport = remoteBroker;
- while (originalTransport instanceof TransportFilter) {
- originalTransport = ((TransportFilter)
originalTransport).getNext();
- }
- if (originalTransport instanceof SslTransport) {
- X509Certificate[] peerCerts = ((SslTransport)
originalTransport).getPeerCertificates();
- localConnectionInfo.setTransportContext(peerCerts);
- }
- localBroker.oneway(localConnectionInfo);
+ if (!disposed.get()) {
+ localConnectionInfo = new ConnectionInfo();
+ localConnectionInfo.setConnectionId(new
ConnectionId(idGenerator.generateId()));
+ localClientId = "NC_" + remoteBrokerName + "_inbound_" +
configuration.getBrokerName();
+ localConnectionInfo.setClientId(localClientId);
+
localConnectionInfo.setUserName(configuration.getUserName());
+
localConnectionInfo.setPassword(configuration.getPassword());
+ Transport originalTransport = remoteBroker;
+ while (originalTransport instanceof TransportFilter) {
+ originalTransport = ((TransportFilter)
originalTransport).getNext();
+ }
+ if (originalTransport instanceof SslTransport) {
+ X509Certificate[] peerCerts = ((SslTransport)
originalTransport).getPeerCertificates();
+ localConnectionInfo.setTransportContext(peerCerts);
+ }
+ localBroker.oneway(localConnectionInfo);
- localSessionInfo = new SessionInfo(localConnectionInfo, 1);
- localBroker.oneway(localSessionInfo);
+ localSessionInfo = new SessionInfo(localConnectionInfo, 1);
+ localBroker.oneway(localSessionInfo);
- LOG.info("Network connection between " + localBroker + " and "
+ remoteBroker + "(" + remoteBrokerName + ") has been established.");
+ LOG.info("Network connection between " + localBroker + "
and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
+ } else {
+ LOG.warn ("Bridge was disposed before the
startLocalBridge() method was fully executed.");
+ }
startedLatch.countDown();
localStartedLatch.countDown();
- setupStaticDestinations();
+ if (!disposed.get()) {
+ setupStaticDestinations();
+ } else {
+ LOG.warn("Network connection between " + localBroker + "
and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during
establishment.");
+ }
}
}
}
@@ -408,6 +421,7 @@ public abstract class DemandForwardingBr
}
}
LOG.info(configuration.getBrokerName() + " bridge to " +
remoteBrokerName + " stopped");
+ remoteBrokerNameKnownLatch.countDown();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Wed Jul 28 10:43:27 2010
@@ -21,13 +21,13 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
@@ -128,6 +128,8 @@ public class DiscoveryNetworkConnector e
try {
bridge.start();
bridges.put(uri, bridge);
+ } catch (TransportDisposedIOException e) {
+ LOG.warn("Network bridge between: " + localURI + " and: " +
uri + " was correctly stopped before it was correctly started.");
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import javax.net.ServerSocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ */
+public class ServerSocketTstFactory extends ServerSocketFactory {
+ private static final Log LOG =
LogFactory.getLog(ServerSocketTstFactory.class);
+
+ private class ServerSocketTst {
+
+ private final ServerSocket socket;
+
+ public ServerSocketTst(int port, Random rnd) throws IOException {
+ this.socket =
ServerSocketFactory.getDefault().createServerSocket(port);
+ }
+
+ public ServerSocketTst(int port, int backlog, Random rnd) throws
IOException {
+ this.socket =
ServerSocketFactory.getDefault().createServerSocket(port, backlog);
+ }
+
+ public ServerSocketTst(int port, int backlog, InetAddress bindAddr,
Random rnd) throws IOException {
+ this.socket =
ServerSocketFactory.getDefault().createServerSocket(port, backlog, bindAddr);
+ }
+
+ public ServerSocket getSocket() {
+ return this.socket;
+ }
+ };
+
+ private final Random rnd;
+
+ public ServerSocketTstFactory() {
+ super();
+ LOG.info("Creating a new ServerSocketTstFactory");
+ this.rnd = new Random();
+ }
+
+ public ServerSocket createServerSocket(int port) throws IOException {
+ ServerSocketTst sSock = new ServerSocketTst(port, this.rnd);
+ return sSock.getSocket();
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog) throws
IOException {
+ ServerSocketTst sSock = new ServerSocketTst(port, backlog, this.rnd);
+ return sSock.getSocket();
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog, InetAddress
ifAddress) throws IOException {
+ ServerSocketTst sSock = new ServerSocketTst(port, backlog, ifAddress,
this.rnd);
+ return sSock.getSocket();
+ }
+
+ private final static ServerSocketTstFactory server = new
ServerSocketTstFactory();
+
+ public static ServerSocketTstFactory getDefault() {
+ return server;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.net.SocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ *
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class SocketTstFactory extends SocketFactory {
+ private static final Log LOG = LogFactory.getLog(SocketTstFactory.class);
+
+ private static final ConcurrentHashMap<InetAddress, Integer>
closeIter = new ConcurrentHashMap<InetAddress, Integer>();
+
+ private class SocketTst {
+
+ private class Bagot implements Runnable {
+ private Thread processus;
+ private Random rnd;
+ private Socket socket;
+ private final InetAddress address;
+
+ public Bagot(Random rnd, Socket socket, InetAddress address) {
+ this.processus = new Thread(this, "Network Faults
maker : undefined");
+ this.rnd = rnd;
+ this.socket = socket;
+ this.address = address;
+ }
+
+ public void start() {
+ this.processus.setName("Network Faults maker : " +
this.socket.toString());
+ this.processus.start();
+ }
+
+ public void run () {
+ int lastDelayVal;
+ Integer lastDelay;
+ while (!this.processus.isInterrupted()) {
+ if (!this.socket.isClosed()) {
+ try {
+ lastDelay =
closeIter.get(this.address);
+ if (lastDelay == null) {
+ lastDelayVal = 0;
+ }
+ else {
+ lastDelayVal =
lastDelay.intValue();
+ if (lastDelayVal > 10)
+ lastDelayVal +=
20;
+ else lastDelayVal +=
1;
+ }
+
+ lastDelay = new
Integer(lastDelayVal);
+
+ LOG.info("Trying to close
client socket " + socket.toString() + " in " + lastDelayVal + " milliseconds");
+
+ try {
+
Thread.sleep(lastDelayVal);
+ } catch (InterruptedException
e) {
+
this.processus.interrupt();
+
Thread.currentThread().interrupt();
+ } catch
(IllegalArgumentException e) {
+ }
+
+ this.socket.close();
+ closeIter.put(this.address,
lastDelay);
+ LOG.info("Client socket " +
this.socket.toString() + " is closed.");
+ } catch (IOException e) {
+ }
+ }
+
+ this.processus.interrupt();
+ }
+ }
+ }
+
+ private final Bagot bagot;
+ private final Socket socket;
+
+ public SocketTst(InetAddress address, int port, Random rnd) throws
IOException {
+ this.socket = new Socket(address, port);
+ bagot = new Bagot(rnd, this.socket, address);
+ }
+
+ public SocketTst(InetAddress address, int port, InetAddress localAddr,
int localPort, Random rnd) throws IOException {
+ this.socket = new Socket(address, port, localAddr, localPort);
+ bagot = new Bagot(rnd, this.socket, address);
+ }
+
+ public SocketTst(String address, int port, Random rnd) throws
UnknownHostException, IOException {
+ this.socket = new Socket(address, port);
+ bagot = new Bagot(rnd, this.socket,
InetAddress.getByName(address));
+ }
+
+ public SocketTst(String address, int port, InetAddress localAddr, int
localPort, Random rnd) throws IOException {
+ this.socket = new Socket(address, port, localAddr, localPort);
+ bagot = new Bagot(rnd, this.socket,
InetAddress.getByName(address));
+ }
+
+ public Socket getSocket() {
+ return this.socket;
+ }
+
+ public void startBagot() {
+ bagot.start();
+ }
+ };
+
+ private final Random rnd;
+
+ public SocketTstFactory() {
+ super();
+ LOG.info("Creating a new SocketTstFactory");
+ this.rnd = new Random();
+ }
+
+ public Socket createSocket(InetAddress host, int port) throws IOException {
+ SocketTst sockTst;
+ sockTst = new SocketTst(host, port, this.rnd);
+ sockTst.startBagot();
+ return sockTst.getSocket();
+ }
+
+ public Socket createSocket(InetAddress host, int port, InetAddress
localAddress, int localPort) throws IOException {
+ SocketTst sockTst;
+ sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+ sockTst.startBagot();
+ return sockTst.getSocket();
+ }
+
+ public Socket createSocket(String host, int port) throws IOException {
+ SocketTst sockTst;
+ sockTst = new SocketTst(host, port, this.rnd);
+ sockTst.startBagot();
+ return sockTst.getSocket();
+ }
+
+ public Socket createSocket(String host, int port, InetAddress
localAddress, int localPort) throws IOException {
+ SocketTst sockTst;
+ sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+ sockTst.startBagot();
+ return sockTst.getSocket();
+ }
+
+ private final static SocketTstFactory client = new SocketTstFactory();
+
+ public static SocketFactory getDefault() {
+ return client;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.SocketFactory;
+
+/**
+ * An implementation of the {...@link Transport} interface using raw tcp/ip
+ *
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
improvement modifications)
+ * @version $Revision$
+ */
+public class TcpFaultyTransport extends TcpTransport implements Transport,
Service, Runnable {
+
+ public TcpFaultyTransport(WireFormat wireFormat, SocketFactory
socketFactory, URI remoteLocation,
+ URI localLocation) throws UnknownHostException,
IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ /**
+ * @return pretty print of 'this'
+ */
+ public String toString() {
+ return "tcpfaulty://" + socket.getInetAddress() + ":" +
socket.getPort();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import org.apache.activemq.transport.tcp.ServerSocketTstFactory;
+
+/**
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class TcpFaultyTransportFactory extends TcpTransportFactory {
+ private static final Log LOG =
LogFactory.getLog(TcpFaultyTransportFactory.class);
+
+ protected TcpFaultyTransport createTcpFaultyTransport(WireFormat wf,
SocketFactory socketFactory, URI location, URI localLocation) throws
UnknownHostException, IOException {
+ return new TcpFaultyTransport(wf, socketFactory, location,
localLocation);
+ }
+
+ protected Transport createTransport(URI location, WireFormat wf) throws
UnknownHostException, IOException {
+ URI localLocation = null;
+ String path = location.getPath();
+ // see if the path is a local URI location
+ if (path != null && path.length() > 0) {
+ int localPortIndex = path.indexOf(':');
+ try {
+ Integer.parseInt(path.substring(localPortIndex + 1,
path.length()));
+ String localString = location.getScheme() + ":/" + path;
+ localLocation = new URI(localString);
+ } catch (Exception e) {
+ LOG.warn("path isn't a valid local location for TcpTransport
to use", e);
+ }
+ }
+ SocketFactory socketFactory = createSocketFactory();
+ return createTcpFaultyTransport(wf, socketFactory, location,
localLocation);
+ }
+
+ protected TcpFaultyTransportServer createTcpFaultyTransportServer(final
URI location, ServerSocketFactory serverSocketFactory) throws IOException,
URISyntaxException {
+ return new TcpFaultyTransportServer(this, location,
serverSocketFactory);
+ }
+
+ public TransportServer doBind(final URI location) throws IOException {
+ try {
+ Map<String, String> options = new HashMap<String,
String>(URISupport.parseParamters(location));
+
+ ServerSocketFactory serverSocketFactory =
createServerSocketFactory();
+ TcpFaultyTransportServer server =
createTcpFaultyTransportServer(location, serverSocketFactory);
+ server.setWireFormatFactory(createWireFormatFactory(options));
+ IntrospectionSupport.setProperties(server, options);
+ Map<String, Object> transportOptions =
IntrospectionSupport.extractProperties(options, "transport.");
+ server.setTransportOption(transportOptions);
+ server.bind();
+
+ return server;
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+
+ protected SocketFactory createSocketFactory() throws IOException {
+ return SocketTstFactory.getDefault();
+ }
+
+
+ protected ServerSocketFactory createServerSocketFactory() throws
IOException {
+ return ServerSocketTstFactory.getDefault();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.util.ServiceListener;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+import javax.net.ServerSocketFactory;
+
+/**
+ * A TCP based implementation of {...@link TransportServer}
+ *
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
improvement modifications)
+ * @version $Revision$
+ */
+
+public class TcpFaultyTransportServer extends TcpTransportServer implements
ServiceListener{
+
+ public TcpFaultyTransportServer(TcpFaultyTransportFactory
transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws
IOException, URISyntaxException {
+ super(transportFactory, location, serverSocketFactory);
+ }
+
+ /**
+ * @return pretty print of this
+ */
+ public String toString() {
+ return "" + getBindLocation();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+
+
+public class MulticastDiscoveryOnFaultyNetworkTest extends
JmsMultipleBrokersTestSupport {
+ protected static final int MESSAGE_COUNT = 200;
+ private static final String HUB = "HubBroker";
+ private static final String SPOKE = "SpokeBroker";
+ public boolean useDuplexNetworkBridge;
+ public boolean sumulateStalledNetwork;
+
+ private TransportConnector mCastTrpConnector;
+
+ public void initCombosForTestSendOnAFaultyTransport() {
+ addCombinationValues( "useDuplexNetworkBridge", new Object[]{
Boolean.TRUE , Boolean.FALSE } );
+ addCombinationValues( "sumulateStalledNetwork", new Object[]{
Boolean.TRUE } );
+ }
+
+ public void testSendOnAFaultyTransport() throws Exception {
+ bridgeBrokers(SPOKE, HUB);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", false);
+
+ // Setup consumers
+ MessageConsumer client = createConsumer(HUB, dest);
+
+ // allow subscription information to flow back to Spoke
+ sleep(600);
+
+ // Send messages
+ sendMessages(SPOKE, dest, MESSAGE_COUNT);
+
+ MessageIdList msgs = getConsumerMessages(HUB, client);
+ msgs.setMaximumDuration(200000L);
+ msgs.waitForMessagesToArrive(MESSAGE_COUNT);
+
+ assertTrue("At least message " + MESSAGE_COUNT +
+ " must be recieved, duplicates are expected, count=" +
msgs.getMessageCount(),
+ MESSAGE_COUNT <= msgs.getMessageCount());
+ }
+
+
+ @Override
+ protected void startAllBrokers() throws Exception {
+ // Ensure HUB is started first so bridge will be active from the get go
+ BrokerItem brokerItem = brokers.get(HUB);
+ brokerItem.broker.start();
+ brokerItem = brokers.get(SPOKE);
+ brokerItem.broker.start();
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+ final String options =
"?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
+ createBroker(new URI("broker:(tcpfaulty://localhost:61617)/" + HUB +
options));
+ createBroker(new URI("broker:(tcpfaulty://localhost:61616)/" + SPOKE +
options));
+ }
+
+ public static Test suite() {
+ return suite(MulticastDiscoveryOnFaultyNetworkTest.class);
+ }
+
+ @Override
+ protected void onSend(int i, TextMessage msg) {
+ sleep(50);
+ }
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+
+
+ @Override
+ protected NetworkConnector bridgeBrokers(BrokerService localBroker,
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean
conduit) throws Exception {
+ DiscoveryNetworkConnector connector = new
DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC"));
+ connector.setDynamicOnly(dynamicOnly);
+ connector.setNetworkTTL(networkTTL);
+ localBroker.addNetworkConnector(connector);
+ maxSetupTime = 2000;
+ if (useDuplexNetworkBridge) {
+ connector.setDuplex(true);
+ }
+
+ List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
+ if (!transportConnectors.isEmpty()) {
+ mCastTrpConnector =
((TransportConnector)transportConnectors.get(0));
+ mCastTrpConnector.setDiscoveryUri(new
URI("multicast://default?group=TESTERIC"));
+ }
+ return connector;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty?rev=980014&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
(added)
+++
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
Wed Jul 28 10:43:27 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.tcp.TcpFaultyTransportFactory