Author: gtully
Date: Wed Jul 28 10:43:27 2010
New Revision: 980014

URL: http://svn.apache.org/viewvc?rev=980014&view=rev
Log:
apply patch from https://issues.apache.org/activemq/browse/AMQ-2774

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
   (with props)
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/
    activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/
    
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/
    
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/
    
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/
    
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Wed Jul 28 10:43:27 2010
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@ import javax.transaction.xa.XAResource;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.CommandTypes;
@@ -80,7 +82,6 @@ import org.apache.activemq.state.Consume
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -88,6 +89,7 @@ import org.apache.activemq.transaction.T
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
@@ -96,7 +98,7 @@ import org.apache.activemq.util.URISuppo
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import static org.apache.activemq.thread.DefaultThreadPools.*;
+import static 
org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
 /**
  * @version $Revision: 1.8 $
  */
@@ -149,6 +151,7 @@ public class TransportConnection impleme
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new 
SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new 
ReentrantReadWriteLock();
+    private BrokerId   duplexRemoteBrokerId;
 
     /**
      * @param connector
@@ -1178,6 +1181,20 @@ public class TransportConnection impleme
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
+                // We first look if existing network connection already exists 
for the same broker Id
+                // It's possible in case of brief network fault to have this 
transport connector side of the connection always active
+                // and the duplex network connector side wanting to open a new 
one
+                // In this case, the old connection must be broken
+                BrokerId       remoteBrokerId = info.getBrokerId();
+                setDuplexRemoteBrokerId(remoteBrokerId);
+                CopyOnWriteArrayList<TransportConnection> connections = 
this.connector.getConnections();
+                for (Iterator<TransportConnection> iter = 
connections.iterator(); iter.hasNext();) {
+                       TransportConnection c = iter.next();
+                    if ((c != this) && 
(remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) {
+                        LOG.warn("An existing duplex active connection already 
exists for this broker (" + remoteBrokerId + "). Stopping it.");
+                        c.stop();
+                    }
+                }
                 Properties properties = 
MarshallingSupport.stringToProperties(info.getNetworkProperties());
                 Map<String, String> props = createMap(properties);
                 NetworkBridgeConfiguration config = new 
NetworkBridgeConfiguration();
@@ -1198,6 +1215,9 @@ public class TransportConnection impleme
                 duplexBridge.duplexStart(this, brokerInfo, info);
                 LOG.info("Created Duplex Bridge back to " + 
info.getBrokerName());
                 return null;
+            } catch (TransportDisposedIOException e) {
+                LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " 
was correctly stopped before it was correctly started.");
+                return null;
             } catch (Exception e) {
                 LOG.error("Creating duplex network bridge", e);
             }
@@ -1391,4 +1411,12 @@ public class TransportConnection impleme
     protected synchronized TransportConnectionState 
lookupConnectionState(ConnectionId connectionId) {
         return connectionStateRegister.lookupConnectionState(connectionId);
     }
+
+    protected synchronized void setDuplexRemoteBrokerId(BrokerId 
remoteBrokerId) {
+        this.duplexRemoteBrokerId = remoteBrokerId;
+    }
+
+    protected synchronized BrokerId getDuplexRemoteBrokerId() {
+        return this.duplexRemoteBrokerId;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Wed Jul 28 10:43:27 2010
@@ -233,14 +233,19 @@ public abstract class DemandForwardingBr
                 // initiator side of duplex network
                 remoteBrokerNameKnownLatch.await();
             }
-            try {
-                triggerRemoteStartBridge();
-            } catch (IOException e) {
-                LOG.warn("Caught exception from remote start", e);
-            }
-            NetworkBridgeListener l = this.networkBridgeListener;
-            if (l != null) {
-                l.onStart(this);
+            if (!disposed.get()) {
+                try {
+                    triggerRemoteStartBridge();
+                } catch (IOException e) {
+                    LOG.warn("Caught exception from remote start", e);
+                }
+                NetworkBridgeListener l = this.networkBridgeListener;
+                if (l != null) {
+                    l.onStart(this);
+                }
+           } else {
+                LOG.warn ("Bridge was disposed before the start() method was 
fully executed.");
+                throw new TransportDisposedIOException();
             }
         }
     }
@@ -285,30 +290,38 @@ public abstract class DemandForwardingBr
                 }
                 remoteBrokerNameKnownLatch.await();
 
-                localConnectionInfo = new ConnectionInfo();
-                localConnectionInfo.setConnectionId(new 
ConnectionId(idGenerator.generateId()));
-                localClientId = "NC_" + remoteBrokerName + "_inbound_" + 
configuration.getBrokerName();
-                localConnectionInfo.setClientId(localClientId);
-                localConnectionInfo.setUserName(configuration.getUserName());
-                localConnectionInfo.setPassword(configuration.getPassword());
-                Transport originalTransport = remoteBroker;
-                while (originalTransport instanceof TransportFilter) {
-                    originalTransport = ((TransportFilter) 
originalTransport).getNext();
-                }
-                if (originalTransport instanceof SslTransport) {
-                    X509Certificate[] peerCerts = ((SslTransport) 
originalTransport).getPeerCertificates();
-                    localConnectionInfo.setTransportContext(peerCerts);
-                }
-                localBroker.oneway(localConnectionInfo);
+                if (!disposed.get()) {
+                    localConnectionInfo = new ConnectionInfo();
+                    localConnectionInfo.setConnectionId(new 
ConnectionId(idGenerator.generateId()));
+                    localClientId = "NC_" + remoteBrokerName + "_inbound_" + 
configuration.getBrokerName();
+                    localConnectionInfo.setClientId(localClientId);
+                    
localConnectionInfo.setUserName(configuration.getUserName());
+                    
localConnectionInfo.setPassword(configuration.getPassword());
+                    Transport originalTransport = remoteBroker;
+                    while (originalTransport instanceof TransportFilter) {
+                        originalTransport = ((TransportFilter) 
originalTransport).getNext();
+                    }
+                    if (originalTransport instanceof SslTransport) {
+                        X509Certificate[] peerCerts = ((SslTransport) 
originalTransport).getPeerCertificates();
+                        localConnectionInfo.setTransportContext(peerCerts);
+                    }
+                    localBroker.oneway(localConnectionInfo);
 
-                localSessionInfo = new SessionInfo(localConnectionInfo, 1);
-                localBroker.oneway(localSessionInfo);
+                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
+                    localBroker.oneway(localSessionInfo);
 
-                LOG.info("Network connection between " + localBroker + " and " 
+ remoteBroker + "(" + remoteBrokerName + ") has been established.");
+                    LOG.info("Network connection between " + localBroker + " 
and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
 
+                } else {
+                    LOG.warn ("Bridge was disposed before the 
startLocalBridge() method was fully executed.");
+                }
                 startedLatch.countDown();
                 localStartedLatch.countDown();
-                setupStaticDestinations();
+                if (!disposed.get()) {
+                    setupStaticDestinations();
+                } else {
+                    LOG.warn("Network connection between " + localBroker + " 
and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during 
establishment.");
+                }
             }
         }
     }
@@ -408,6 +421,7 @@ public abstract class DemandForwardingBr
                 }
             }
             LOG.info(configuration.getBrokerName() + " bridge to " + 
remoteBrokerName + " stopped");
+            remoteBrokerNameKnownLatch.countDown();
         }
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=980014&r1=980013&r2=980014&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
 Wed Jul 28 10:43:27 2010
@@ -21,13 +21,13 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -128,6 +128,8 @@ public class DiscoveryNetworkConnector e
             try {
                 bridge.start();
                 bridges.put(uri, bridge);
+           } catch (TransportDisposedIOException e) {
+                LOG.warn("Network bridge between: " + localURI + " and: " + 
uri + " was correctly stopped before it was correctly started.");
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import javax.net.ServerSocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ */
+public class ServerSocketTstFactory extends ServerSocketFactory {
+    private static final Log LOG = 
LogFactory.getLog(ServerSocketTstFactory.class);
+
+    private class ServerSocketTst {
+
+       private final   ServerSocket    socket;
+
+       public ServerSocketTst(int port, Random rnd) throws IOException {
+               this.socket = 
ServerSocketFactory.getDefault().createServerSocket(port);
+       }
+
+       public ServerSocketTst(int port, int backlog, Random rnd) throws 
IOException {
+               this.socket = 
ServerSocketFactory.getDefault().createServerSocket(port, backlog);
+       }
+
+       public ServerSocketTst(int port, int backlog, InetAddress bindAddr, 
Random rnd) throws IOException {
+               this.socket = 
ServerSocketFactory.getDefault().createServerSocket(port, backlog, bindAddr);
+       }
+
+       public ServerSocket     getSocket() {
+               return this.socket;
+       }
+    };
+
+   private final Random        rnd;
+
+   public ServerSocketTstFactory() {
+       super();
+       LOG.info("Creating a new ServerSocketTstFactory");
+       this.rnd = new Random();
+   }
+
+   public ServerSocket createServerSocket(int port) throws IOException {
+       ServerSocketTst sSock = new ServerSocketTst(port, this.rnd);
+       return sSock.getSocket();
+   }
+
+   public ServerSocket createServerSocket(int port, int backlog) throws 
IOException {
+       ServerSocketTst sSock = new ServerSocketTst(port, backlog, this.rnd);
+       return sSock.getSocket();
+   }
+
+   public ServerSocket createServerSocket(int port, int backlog, InetAddress 
ifAddress) throws IOException {
+       ServerSocketTst sSock = new ServerSocketTst(port, backlog, ifAddress, 
this.rnd);
+       return sSock.getSocket();
+   }
+
+   private final static ServerSocketTstFactory server = new 
ServerSocketTstFactory();
+
+   public static ServerSocketTstFactory getDefault() {
+       return server;
+   }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.net.SocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ *
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class SocketTstFactory extends SocketFactory {
+    private static final Log LOG = LogFactory.getLog(SocketTstFactory.class);
+
+    private static final ConcurrentHashMap<InetAddress, Integer>       
closeIter = new ConcurrentHashMap<InetAddress, Integer>();
+
+    private class SocketTst {
+
+       private class Bagot implements Runnable {
+               private Thread                  processus;
+               private Random  rnd;
+               private Socket                  socket;
+               private final InetAddress       address;
+
+               public Bagot(Random rnd, Socket socket, InetAddress address) {
+                       this.processus  = new Thread(this, "Network Faults 
maker : undefined");
+                       this.rnd        = rnd;
+                       this.socket     = socket;
+                       this.address    = address;
+               }
+
+               public void start() {
+                       this.processus.setName("Network Faults maker : " + 
this.socket.toString());
+                       this.processus.start();
+               }
+
+               public void run () {
+                       int     lastDelayVal;
+                       Integer lastDelay;
+                       while (!this.processus.isInterrupted()) {
+                               if (!this.socket.isClosed()) {
+                                       try {
+                                               lastDelay = 
closeIter.get(this.address);
+                                               if (lastDelay == null) { 
+                                                       lastDelayVal = 0;
+                                               }
+                                               else {
+                                                       lastDelayVal = 
lastDelay.intValue();
+                                                       if (lastDelayVal > 10)
+                                                               lastDelayVal += 
20;
+                                                       else    lastDelayVal += 
1;      
+                                               }
+
+                                               lastDelay = new 
Integer(lastDelayVal);
+
+                                               LOG.info("Trying to close 
client socket " + socket.toString() +  " in " + lastDelayVal + " milliseconds");
+
+                                               try {
+                                                       
Thread.sleep(lastDelayVal);
+                                               } catch (InterruptedException 
e) {
+                                                       
this.processus.interrupt();
+                                                       
Thread.currentThread().interrupt();
+                                               } catch 
(IllegalArgumentException e) {
+                                               }
+                                                       
+                                               this.socket.close();
+                                               closeIter.put(this.address, 
lastDelay);
+                                               LOG.info("Client socket " + 
this.socket.toString() + " is closed.");
+                                       } catch (IOException e) {
+                                       }
+                               }
+
+                               this.processus.interrupt();
+                       }
+               }
+       }
+
+       private final Bagot             bagot;
+       private final Socket            socket;
+
+       public SocketTst(InetAddress address, int port, Random rnd) throws 
IOException {
+               this.socket = new Socket(address, port);
+               bagot = new Bagot(rnd, this.socket, address);
+       }
+
+       public SocketTst(InetAddress address, int port, InetAddress localAddr, 
int localPort, Random rnd) throws IOException {
+               this.socket = new Socket(address, port, localAddr, localPort);
+               bagot = new Bagot(rnd, this.socket, address);
+       }
+
+       public SocketTst(String address, int port, Random rnd) throws 
UnknownHostException, IOException {
+               this.socket = new Socket(address, port);
+               bagot = new Bagot(rnd, this.socket, 
InetAddress.getByName(address));
+       }
+
+       public SocketTst(String address, int port, InetAddress localAddr, int 
localPort, Random rnd) throws IOException {
+               this.socket = new Socket(address, port, localAddr, localPort);
+               bagot = new Bagot(rnd, this.socket, 
InetAddress.getByName(address));
+       }
+
+       public Socket getSocket() {
+               return this.socket;
+       }
+
+       public void startBagot() {
+               bagot.start();
+       }
+    };
+
+    private final Random               rnd;
+
+    public SocketTstFactory() {
+       super();
+       LOG.info("Creating a new SocketTstFactory");
+       this.rnd        = new Random();
+    }
+
+    public Socket createSocket(InetAddress host, int port) throws IOException {
+       SocketTst sockTst;
+       sockTst = new SocketTst(host, port, this.rnd);
+       sockTst.startBagot();
+       return sockTst.getSocket();
+    }
+
+    public Socket createSocket(InetAddress host, int port, InetAddress 
localAddress, int localPort) throws IOException {
+       SocketTst       sockTst;
+       sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+       sockTst.startBagot();
+       return sockTst.getSocket();
+    }
+
+    public Socket createSocket(String host, int port) throws IOException {
+       SocketTst       sockTst;
+       sockTst = new SocketTst(host, port, this.rnd);
+       sockTst.startBagot();
+       return sockTst.getSocket();
+    }
+
+    public Socket createSocket(String host, int port, InetAddress 
localAddress, int localPort) throws IOException {
+       SocketTst       sockTst;
+       sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
+       sockTst.startBagot();
+       return sockTst.getSocket();
+    }
+
+    private final static SocketTstFactory client = new SocketTstFactory();
+
+    public static SocketFactory getDefault() {
+       return client;
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+import org.apache.activemq.wireformat.WireFormat;
+
+import javax.net.SocketFactory;
+
+/**
+ * An implementation of the {...@link Transport} interface using raw tcp/ip
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging 
improvement modifications)
+ * @version $Revision$
+ */
+public class TcpFaultyTransport extends TcpTransport implements Transport, 
Service, Runnable {
+
+    public TcpFaultyTransport(WireFormat wireFormat, SocketFactory 
socketFactory, URI remoteLocation,
+                        URI localLocation) throws UnknownHostException, 
IOException {
+       super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    public String toString() {
+        return "tcpfaulty://" + socket.getInetAddress() + ":" + 
socket.getPort();
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import org.apache.activemq.transport.tcp.ServerSocketTstFactory;
+
+/**
+ * Automatically generated socket.close() calls to simulate network faults
+ */
+public class TcpFaultyTransportFactory extends TcpTransportFactory {
+    private static final Log LOG = 
LogFactory.getLog(TcpFaultyTransportFactory.class);
+
+   protected TcpFaultyTransport createTcpFaultyTransport(WireFormat wf, 
SocketFactory socketFactory, URI location, URI localLocation) throws 
UnknownHostException, IOException {
+        return new TcpFaultyTransport(wf, socketFactory, location, 
localLocation);
+    }
+
+    protected Transport createTransport(URI location, WireFormat wf) throws 
UnknownHostException, IOException {
+        URI localLocation = null;
+        String path = location.getPath();
+        // see if the path is a local URI location
+        if (path != null && path.length() > 0) {
+            int localPortIndex = path.indexOf(':');
+            try {
+                Integer.parseInt(path.substring(localPortIndex + 1, 
path.length()));
+                String localString = location.getScheme() + ":/" + path;
+                localLocation = new URI(localString);
+            } catch (Exception e) {
+                LOG.warn("path isn't a valid local location for TcpTransport 
to use", e);
+            }
+        }
+        SocketFactory socketFactory = createSocketFactory();
+        return createTcpFaultyTransport(wf, socketFactory, location, 
localLocation);
+    }
+
+    protected TcpFaultyTransportServer createTcpFaultyTransportServer(final 
URI location, ServerSocketFactory serverSocketFactory) throws IOException, 
URISyntaxException     {
+        return new TcpFaultyTransportServer(this, location, 
serverSocketFactory);
+    }
+
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParamters(location));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            TcpFaultyTransportServer server = 
createTcpFaultyTransportServer(location, serverSocketFactory);
+            server.setWireFormatFactory(createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
+            Map<String, Object> transportOptions = 
IntrospectionSupport.extractProperties(options, "transport.");
+            server.setTransportOption(transportOptions);
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    
+    protected SocketFactory createSocketFactory() throws IOException {
+       return SocketTstFactory.getDefault();
+    }
+
+    
+    protected ServerSocketFactory createServerSocketFactory() throws 
IOException {
+       return ServerSocketTstFactory.getDefault();
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activemq.util.ServiceListener;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+
+import javax.net.ServerSocketFactory;
+
+/**
+ * A TCP based implementation of {...@link TransportServer}
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging 
improvement modifications)
+ * @version $Revision$
+ */
+
+public class TcpFaultyTransportServer extends TcpTransportServer implements 
ServiceListener{
+
+    public TcpFaultyTransportServer(TcpFaultyTransportFactory 
transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws 
IOException, URISyntaxException {
+       super(transportFactory, location, serverSocketFactory);
+    }
+
+    /**
+     * @return pretty print of this
+     */
+    public String toString() {
+        return "" + getBindLocation();
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.SocketProxy;
+
+
+public class MulticastDiscoveryOnFaultyNetworkTest extends 
JmsMultipleBrokersTestSupport {
+    protected static final int MESSAGE_COUNT = 200;
+    private static final String HUB = "HubBroker";
+    private static final String SPOKE = "SpokeBroker";
+    public boolean useDuplexNetworkBridge;
+    public boolean sumulateStalledNetwork;
+
+   private TransportConnector mCastTrpConnector;
+   
+    public void initCombosForTestSendOnAFaultyTransport() {
+        addCombinationValues( "useDuplexNetworkBridge", new Object[]{ 
Boolean.TRUE , Boolean.FALSE } );
+        addCombinationValues( "sumulateStalledNetwork", new Object[]{ 
Boolean.TRUE } );
+    }
+    
+    public void testSendOnAFaultyTransport() throws Exception {
+        bridgeBrokers(SPOKE, HUB);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", false);
+        
+        // Setup consumers
+        MessageConsumer client = createConsumer(HUB, dest);
+        
+        // allow subscription information to flow back to Spoke
+        sleep(600);
+        
+        // Send messages
+        sendMessages(SPOKE, dest, MESSAGE_COUNT);
+
+        MessageIdList msgs = getConsumerMessages(HUB, client);
+       msgs.setMaximumDuration(200000L);
+        msgs.waitForMessagesToArrive(MESSAGE_COUNT);
+
+        assertTrue("At least message " + MESSAGE_COUNT + 
+                " must be recieved, duplicates are expected, count=" + 
msgs.getMessageCount(),
+                MESSAGE_COUNT <= msgs.getMessageCount());
+    }
+
+    
+    @Override
+    protected void startAllBrokers() throws Exception {
+        // Ensure HUB is started first so bridge will be active from the get go
+        BrokerItem brokerItem = brokers.get(HUB);
+        brokerItem.broker.start();
+        brokerItem = brokers.get(SPOKE);
+        brokerItem.broker.start();
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        final String options = 
"?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
+        createBroker(new URI("broker:(tcpfaulty://localhost:61617)/" + HUB + 
options));
+        createBroker(new URI("broker:(tcpfaulty://localhost:61616)/" + SPOKE + 
options));
+    }
+    
+    public static Test suite() {
+        return suite(MulticastDiscoveryOnFaultyNetworkTest.class);
+    }
+       
+    @Override
+    protected void onSend(int i, TextMessage msg) {
+        sleep(50);
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }    
+    }
+
+
+    @Override
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean 
conduit) throws Exception {
+        DiscoveryNetworkConnector connector = new 
DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC"));
+        connector.setDynamicOnly(dynamicOnly);
+        connector.setNetworkTTL(networkTTL);
+        localBroker.addNetworkConnector(connector);
+        maxSetupTime = 2000;
+        if (useDuplexNetworkBridge) {
+            connector.setDuplex(true);
+        }
+
+        List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
+        if (!transportConnectors.isEmpty()) {
+               mCastTrpConnector = 
((TransportConnector)transportConnectors.get(0));
+               mCastTrpConnector.setDiscoveryUri(new 
URI("multicast://default?group=TESTERIC"));
+       }
+       return connector;
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty?rev=980014&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
 (added)
+++ 
activemq/trunk/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty
 Wed Jul 28 10:43:27 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.tcp.TcpFaultyTransportFactory


Reply via email to