Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+       /// <summary>
+       /// Manages the thread pool for long running tasks. Long running tasks 
are not
+       /// always active but when they are active, they may need a few 
iterations of
+       /// processing for them to become idle. The manager ensures that each 
task is
+       /// processes but that no one task overtakes the system. This is kina 
like
+       /// cooperative multitasking.
+       /// </summary>
+       public class TaskRunnerFactory
+       {
+
+               private int maxIterationsPerRun;
+               private String name;
+               private ThreadPriority priority;
+               private bool daemon;
+
+               public TaskRunnerFactory()
+               {
+                       initTaskRunnerFactory("ActiveMQ Task", 
ThreadPriority.Normal, true, 1000, false);
+               }
+
+               public TaskRunnerFactory(String name, ThreadPriority priority, 
bool daemon, int maxIterationsPerRun)
+               {
+                       initTaskRunnerFactory(name, priority, daemon, 
maxIterationsPerRun, false);
+               }
+
+               public TaskRunnerFactory(String name, ThreadPriority priority, 
bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+               {
+                       initTaskRunnerFactory(name, priority, daemon, 
maxIterationsPerRun, dedicatedTaskRunner);
+               }
+
+               public void initTaskRunnerFactory(String name, ThreadPriority 
priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+               {
+
+                       this.name = name;
+                       this.priority = priority;
+                       this.daemon = daemon;
+                       this.maxIterationsPerRun = maxIterationsPerRun;
+
+                       // If your OS/JVM combination has a good thread model, 
you may want to avoid
+                       // using a thread pool to run tasks and use a 
DedicatedTaskRunner instead.
+               }
+
+               public void shutdown()
+               {
+               }
+
+               public TaskRunner CreateTaskRunner(Task task, String name)
+               {
+                       return new PooledTaskRunner(task, maxIterationsPerRun);
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+       class BackupTransport
+       {
+               private FailoverTransport failoverTransport;
+               private ITransport transport;
+               private Uri uri;
+               private bool disposed;
+
+               public BackupTransport(FailoverTransport ft)
+               {
+                       this.failoverTransport = ft;
+               }
+
+               public void onCommand(ITransport t, Command c)
+               {
+               }
+
+               public void onException(ITransport t, Exception error)
+               {
+                       this.disposed = true;
+                       if(failoverTransport != null)
+                       {
+                               this.failoverTransport.Reconnect();
+                       }
+               }
+
+               public ITransport Transport
+               {
+                       get
+                       {
+                               return transport;
+                       }
+                       set
+                       {
+                               transport = value;
+                       }
+               }
+
+               public Uri Uri
+               {
+                       get
+                       {
+                               return uri;
+                       }
+                       set
+                       {
+                               uri = value;
+                       }
+               }
+
+               public bool Disposed
+               {
+                       get
+                       {
+                               return disposed || (transport != null && 
transport.IsDisposed);
+                       }
+                       set
+                       {
+                               disposed = value;
+                       }
+               }
+
+               public int hashCode()
+               {
+                       return uri != null ? uri.GetHashCode() : -1;
+               }
+
+               public bool equals(Object obj)
+               {
+                       if(obj is BackupTransport)
+                       {
+                               BackupTransport other = obj as BackupTransport;
+                               return uri == null && other.uri == null ||
+                                       (uri != null && other.uri != null && 
uri.Equals(other.uri));
+                       }
+                       return false;
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,1142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+       /// <summary>
+       /// A Transport that is made reliable by being able to fail over to 
another
+       /// transport when a transport failure is detected.
+       /// </summary>
+       public class FailoverTransport : ICompositeTransport, IComparable
+       {
+
+               private static int _idCounter = 0;
+               private int _id;
+
+               private bool disposed;
+               private bool connected;
+               private List<Uri> uris = new List<Uri>();
+               private CommandHandler _commandHandler;
+               private ExceptionHandler _exceptionHandler;
+
+               private Mutex reconnectMutex = new Mutex();
+               private Mutex backupMutex = new Mutex();
+               private Mutex sleepMutex = new Mutex();
+               private Mutex listenerMutex = new Mutex();
+               private ConnectionStateTracker stateTracker = new 
ConnectionStateTracker();
+               private Dictionary<int, Command> requestMap = new 
Dictionary<int, Command>();
+
+               private Uri connectedTransportURI;
+               private Uri failedConnectTransportURI;
+               private AtomicReference<ITransport> connectedTransport = new 
AtomicReference<ITransport>(null);
+               private TaskRunner reconnectTask = null;
+               private bool started;
+
+               private int _initialReconnectDelay = 10;
+               private int _maxReconnectDelay = 1000 * 30;
+               private int _backOffMultiplier = 2;
+               private bool _useExponentialBackOff = true;
+               private bool _randomize = true;
+               private bool initialized;
+               private int _maxReconnectAttempts;
+               private int connectFailures;
+               private int _reconnectDelay = 10;
+               private Exception connectionFailure;
+               private bool firstConnection = true;
+               //optionally always have a backup created
+               private bool _backup = false;
+               private List<BackupTransport> backups = new 
List<BackupTransport>();
+               private int _backupPoolSize = 1;
+               private bool _trackMessages = false;
+               private int _maxCacheSize = 256;
+               private TimeSpan requestTimeout = 
NMSConstants.defaultRequestTimeout;
+
+               public TimeSpan RequestTimeout
+               {
+                       get
+                       {
+                               return requestTimeout;
+                       }
+                       set
+                       {
+                               requestTimeout = value;
+                       }
+               }
+
+               private class FailoverTask : Task
+               {
+                       private FailoverTransport parent;
+
+                       public FailoverTask(FailoverTransport p)
+                       {
+                               parent = p;
+                       }
+
+                       public bool iterate()
+                       {
+                               bool result = false;
+                               bool buildBackup = true;
+                               bool doReconnect = !parent.disposed;
+                               try
+                               {
+                                       parent.backupMutex.WaitOne();
+                                       if(parent.ConnectedTransport == null && 
!parent.disposed)
+                                       {
+                                               result = parent.doReconnect();
+                                               buildBackup = false;
+                                       }
+                               }
+                               finally
+                               {
+                                       parent.backupMutex.ReleaseMutex();
+                               }
+                               if(buildBackup)
+                               {
+                                       parent.buildBackups();
+                               }
+                               else
+                               {
+                                       //build backups on the next iteration
+                                       result = true;
+                                       try
+                                       {
+                                               parent.reconnectTask.wakeup();
+                                       }
+                                       catch(ThreadInterruptedException e)
+                                       {
+                                               e.GetType();
+                                               Tracer.Debug("Reconnect task 
has been interrupted.");
+                                       }
+                               }
+                               return result;
+                       }
+               }
+
+               public FailoverTransport()
+               {
+                       _id = _idCounter++;
+
+                       stateTracker.TrackTransactions = true;
+               }
+
+               ~FailoverTransport()
+               {
+                       Dispose(false);
+               }
+
+               public void onCommand(ITransport sender, Command command)
+               {
+                       if(command != null)
+                       {
+                               if(command.IsResponse)
+                               {
+                                       Object oo = null;
+                                       lock(requestMap)
+                                       {
+                                               int v = ((Response) 
command).CorrelationId;
+                                               try
+                                               {
+                                                       oo = requestMap[v];
+                                                       requestMap.Remove(v);
+                                               }
+                                               catch
+                                               {
+                                               }
+                                       }
+                                       Tracked t = oo as Tracked;
+                                       if(t != null)
+                                       {
+                                               t.onResponses();
+                                       }
+                               }
+                               if(!initialized)
+                               {
+                                       if(command.IsBrokerInfo)
+                                       {
+                                               BrokerInfo info = (BrokerInfo) 
command;
+                                               BrokerInfo[] peers = 
info.PeerBrokerInfos;
+                                               if(peers != null)
+                                               {
+                                                       for(int i = 0; i < 
peers.Length; i++)
+                                                       {
+                                                               String 
brokerString = peers[i].BrokerURL;
+                                                               
add(brokerString);
+                                                       }
+                                               }
+                                               initialized = true;
+                                       }
+                               }
+                       }
+                       this.Command(sender, command);
+               }
+
+               public void onException(ITransport sender, Exception error)
+               {
+                       try
+                       {
+                               handleTransportFailure(error);
+                       }
+                       catch(Exception e)
+                       {
+                               e.GetType();
+                               // What to do here?
+                       }
+               }
+
+               public void disposedOnCommand(ITransport sender, Command c)
+               {
+               }
+
+               public void disposedOnException(ITransport sender, Exception e)
+               {
+               }
+
+               public void handleTransportFailure(Exception e)
+               {
+                       ITransport transport = 
connectedTransport.GetAndSet(null);
+                       if(transport != null)
+                       {
+                               transport.Command = new 
CommandHandler(disposedOnCommand);
+                               transport.Exception = new 
ExceptionHandler(disposedOnException);
+                               try
+                               {
+                                       transport.Stop();
+                               }
+                               catch(Exception ex)
+                               {
+                                       ex.GetType();   // Ignore errors but 
this lets us see the error during debugging
+                               }
+
+                               try
+                               {
+                                       reconnectMutex.WaitOne();
+                                       bool reconnectOk = false;
+                                       if(started)
+                                       {
+                                               Tracer.Warn("Transport failed 
to " + ConnectedTransportURI + " , attempting to automatically reconnect due 
to: " + e.Message);
+                                               Tracer.Debug("Transport failed 
with the following exception:" + e.Message);
+                                               reconnectOk = true;
+                                       }
+
+                                       initialized = false;
+                                       failedConnectTransportURI = 
ConnectedTransportURI;
+                                       ConnectedTransportURI = null;
+                                       connected = false;
+                                       if(reconnectOk)
+                                       {
+                                               reconnectTask.wakeup();
+                                       }
+                               }
+                               finally
+                               {
+                                       reconnectMutex.ReleaseMutex();
+                               }
+                       }
+               }
+
+               public void Start()
+               {
+                       try
+                       {
+                               reconnectMutex.WaitOne();
+                               Tracer.Debug("Started.");
+                               if(started)
+                               {
+                                       return;
+                               }
+                               started = true;
+                               stateTracker.MaxCacheSize = MaxCacheSize;
+                               stateTracker.TrackMessages = TrackMessages;
+                               if(ConnectedTransport != null)
+                               {
+                                       
stateTracker.DoRestore(ConnectedTransport);
+                               }
+                               else
+                               {
+                                       Reconnect();
+                               }
+                       }
+                       finally
+                       {
+                               reconnectMutex.ReleaseMutex();
+                       }
+               }
+
+               public virtual void Stop()
+               {
+                       ITransport transportToStop = null;
+                       try
+                       {
+                               reconnectMutex.WaitOne();
+                               Tracer.Debug("Stopped.");
+                               if(!started)
+                               {
+                                       return;
+                               }
+                               started = false;
+                               disposed = true;
+                               connected = false;
+                               foreach(BackupTransport t in backups)
+                               {
+                                       t.Disposed = true;
+                               }
+                               backups.Clear();
+
+                               if(ConnectedTransport != null)
+                               {
+                                       transportToStop = 
connectedTransport.GetAndSet(null);
+                               }
+                       }
+                       finally
+                       {
+                               reconnectMutex.ReleaseMutex();
+                       }
+                       try
+                       {
+                               sleepMutex.WaitOne();
+                       }
+                       finally
+                       {
+                               sleepMutex.ReleaseMutex();
+                       }
+                       reconnectTask.shutdown();
+                       if(transportToStop != null)
+                       {
+                               transportToStop.Stop();
+                       }
+               }
+
+               public int InitialReconnectDelay
+               {
+                       get
+                       {
+                               return _initialReconnectDelay;
+                       }
+                       set
+                       {
+                               _initialReconnectDelay = value;
+                       }
+               }
+
+               public int MaxReconnectDelay
+               {
+                       get
+                       {
+                               return _maxReconnectDelay;
+                       }
+                       set
+                       {
+                               _maxReconnectDelay = value;
+                       }
+               }
+
+               public int ReconnectDelay
+               {
+                       get
+                       {
+                               return _reconnectDelay;
+                       }
+                       set
+                       {
+                               _reconnectDelay = value;
+                       }
+               }
+
+               public int ReconnectDelayExponent
+               {
+                       get
+                       {
+                               return _backOffMultiplier;
+                       }
+                       set
+                       {
+                               _backOffMultiplier = value;
+                       }
+               }
+
+               public ITransport ConnectedTransport
+               {
+                       get
+                       {
+                               return connectedTransport.Value;
+                       }
+                       set
+                       {
+                               connectedTransport.Value = value;
+                       }
+               }
+
+               public Uri ConnectedTransportURI
+               {
+                       get
+                       {
+                               return connectedTransportURI;
+                       }
+                       set
+                       {
+                               connectedTransportURI = value;
+                       }
+               }
+
+               public int MaxReconnectAttempts
+               {
+                       get
+                       {
+                               return _maxReconnectAttempts;
+                       }
+                       set
+                       {
+                               _maxReconnectAttempts = value;
+                       }
+               }
+
+               public bool Randomize
+               {
+                       get
+                       {
+                               return _randomize;
+                       }
+                       set
+                       {
+                               _randomize = value;
+                       }
+               }
+
+               public bool Backup
+               {
+                       get
+                       {
+                               return _backup;
+                       }
+                       set
+                       {
+                               _backup = value;
+                       }
+               }
+
+               public int BackupPoolSize
+               {
+                       get
+                       {
+                               return _backupPoolSize;
+                       }
+                       set
+                       {
+                               _backupPoolSize = value;
+                       }
+               }
+
+               public bool TrackMessages
+               {
+                       get
+                       {
+                               return _trackMessages;
+                       }
+                       set
+                       {
+                               _trackMessages = value;
+                       }
+               }
+
+               public int MaxCacheSize
+               {
+                       get
+                       {
+                               return _maxCacheSize;
+                       }
+                       set
+                       {
+                               _maxCacheSize = value;
+                       }
+               }
+
+               /// <summary>
+               /// </summary>
+               /// <param name="command"></param>
+               /// <returns>Returns true if the command is one sent when a 
connection is being closed.</returns>
+               private bool IsShutdownCommand(Command command)
+               {
+                       return (command != null && (command.IsShutdownInfo || 
command is RemoveInfo));
+               }
+
+               public void Oneway(Command command)
+               {
+                       Exception error = null;
+                       try
+                       {
+
+                               try
+                               {
+                                       reconnectMutex.WaitOne();
+
+                                       if(IsShutdownCommand(command) && 
ConnectedTransport == null)
+                                       {
+                                               if(command.IsShutdownInfo)
+                                               {
+                                                       // Skipping send of 
ShutdownInfo command when not connected.
+                                                       return;
+                                               }
+                                               if(command is RemoveInfo)
+                                               {
+                                                       // Simulate response to 
RemoveInfo command
+                                                       Response response = new 
Response();
+                                                       response.CorrelationId 
= command.CommandId;
+                                                       onCommand(this, 
response);
+                                                       return;
+                                               }
+                                       }
+                                       // Keep trying until the message is 
sent.
+                                       for(int i = 0; !disposed; i++)
+                                       {
+                                               try
+                                               {
+                                                       // Wait for transport 
to be connected.
+                                                       ITransport transport = 
ConnectedTransport;
+                                                       while(transport == null 
&& !disposed
+                                                               && 
connectionFailure == null
+                                                               // && 
!Thread.CurrentThread.isInterrupted()
+                                                               )
+                                                       {
+                                                               
Tracer.Info("Waiting for transport to reconnect.");
+                                                               try
+                                                               {
+                                                                       // 
Release so that the reconnect task can run
+                                                                       
reconnectMutex.ReleaseMutex();
+                                                                       try
+                                                                       {
+                                                                               
// Wait for something
+                                                                               
Thread.Sleep(1000);
+                                                                       }
+                                                                       
catch(ThreadInterruptedException e)
+                                                                       {
+                                                                               
Thread.CurrentThread.Interrupt();               // KILROY - not needed
+                                                                               
Tracer.Debug("Interupted: " + e);
+                                                                       }
+                                                               }
+                                                               finally
+                                                               {
+                                                                       
reconnectMutex.WaitOne();
+                                                               }
+                                                               transport = 
ConnectedTransport;
+                                                       }
+
+                                                       if(transport == null)
+                                                       {
+                                                               // Previous 
loop may have exited due to use being
+                                                               // disposed.
+                                                               if(disposed)
+                                                               {
+                                                                       error = 
new IOException("Transport disposed.");
+                                                               }
+                                                               else 
if(connectionFailure != null)
+                                                               {
+                                                                       error = 
connectionFailure;
+                                                               }
+                                                               else
+                                                               {
+                                                                       error = 
new IOException("Unexpected failure.");
+                                                               }
+                                                               break;
+                                                       }
+
+                                                       // If it was a request 
and it was not being tracked by
+                                                       // the state tracker,
+                                                       // then hold it in the 
requestMap so that we can replay
+                                                       // it later.
+                                                       Tracked tracked = 
stateTracker.track(command);
+                                                       lock(requestMap)
+                                                       {
+                                                               if(tracked != 
null && tracked.WaitingForResponse)
+                                                               {
+                                                                       
requestMap.Add(command.CommandId, tracked);
+                                                               }
+                                                               else if(tracked 
== null && command.ResponseRequired)
+                                                               {
+                                                                       
requestMap.Add(command.CommandId, command);
+                                                               }
+                                                       }
+
+                                                       // Send the message.
+                                                       try
+                                                       {
+                                                               
transport.Oneway(command);
+                                                               
stateTracker.trackBack(command);
+                                                       }
+                                                       catch(Exception e)
+                                                       {
+
+                                                               // If the 
command was not tracked.. we will retry in
+                                                               // this method
+                                                               if(tracked == 
null)
+                                                               {
+
+                                                                       // 
since we will retry in this method.. take it
+                                                                       // out 
of the request
+                                                                       // map 
so that it is not sent 2 times on
+                                                                       // 
recovery
+                                                                       
if(command.ResponseRequired)
+                                                                       {
+                                                                               
requestMap.Remove(command.CommandId);
+                                                                       }
+
+                                                                       // 
Rethrow the exception so it will handled by
+                                                                       // the 
outer catch
+                                                                       throw e;
+                                                               }
+
+                                                       }
+
+                                                       return;
+
+                                               }
+                                               catch(Exception e)
+                                               {
+                                                       Tracer.Debug("Send 
Oneway attempt: " + i + " failed.");
+                                                       
handleTransportFailure(e);
+                                               }
+                                       }
+                               }
+                               finally
+                               {
+                                       reconnectMutex.ReleaseMutex();
+                               }
+                       }
+                       catch(ThreadInterruptedException e)
+                       {
+                               e.GetType();
+                               // Some one may be trying to stop our thread.
+                               Thread.CurrentThread.Interrupt();
+                               throw new ThreadInterruptedException();
+                       }
+                       if(!disposed)
+                       {
+                               if(error != null)
+                               {
+                                       throw error;
+                               }
+                       }
+               }
+
+               /*
+               public FutureResponse asyncRequest(Object command, 
ResponseCallback responseCallback) {
+               throw new AssertionError("Unsupported Method");
+               }
+               */
+
+               public Object Request(Object command)
+               {
+                       throw new ApplicationException("FailoverTransport does 
not support Request(Object)");
+               }
+
+               public Object Request(Object command, int timeout)
+               {
+                       throw new ApplicationException("FailoverTransport does 
not support Request(Object, Int)");
+               }
+
+               public void add(Uri[] u)
+               {
+                       lock(uris)
+                       {
+                               for(int i = 0; i < u.Length; i++)
+                               {
+                                       if(!uris.Contains(u[i]))
+                                       {
+                                               uris.Add(u[i]);
+                                       }
+                               }
+                       }
+                       Reconnect();
+               }
+
+               public void remove(Uri[] u)
+               {
+                       lock(uris)
+                       {
+                               for(int i = 0; i < u.Length; i++)
+                               {
+                                       uris.Remove(u[i]);
+                               }
+                       }
+                       Reconnect();
+               }
+
+               public void add(String u)
+               {
+                       try
+                       {
+                               Uri uri = new Uri(u);
+                               lock(uris)
+                               {
+                                       if(!uris.Contains(uri))
+                                       {
+                                               uris.Add(uri);
+                                       }
+                               }
+
+                               Reconnect();
+                       }
+                       catch(Exception e)
+                       {
+                               Tracer.Error("Failed to parse URI: " + u + " 
because " + e.Message);
+                       }
+               }
+
+               public void Reconnect()
+               {
+                       try
+                       {
+                               reconnectMutex.WaitOne();
+
+                               if(started)
+                               {
+                                       if(reconnectTask == null)
+                                       {
+                                               Tracer.Debug("Creating 
reconnect task");
+                                               reconnectTask = 
DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new 
FailoverTask(this),
+                                                                               
        "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+                                       }
+
+                                       Tracer.Debug("Waking up reconnect 
task");
+                                       try
+                                       {
+                                               reconnectTask.wakeup();
+                                       }
+                                       catch(ThreadInterruptedException e)
+                                       {
+                                               e.GetType();    // Suppress 
warning
+                                               
Thread.CurrentThread.Interrupt();
+                                       }
+                               }
+                               else
+                               {
+                                       Tracer.Debug("Reconnect was triggered 
but transport is not started yet. Wait for start to connect the transport.");
+                               }
+                       }
+                       finally
+                       {
+                               reconnectMutex.ReleaseMutex();
+                       }
+               }
+
+               private List<Uri> ConnectList
+               {
+                       get
+                       {
+                               List<Uri> l = new List<Uri>(uris);
+                               bool removed = false;
+                               if(failedConnectTransportURI != null)
+                               {
+                                       removed = 
l.Remove(failedConnectTransportURI);
+                               }
+                               if(Randomize)
+                               {
+                                       // Randomly, reorder the list by random 
swapping
+                                       Random r = new 
Random(DateTime.Now.Millisecond);
+                                       for(int i = 0; i < l.Count; i++)
+                                       {
+                                               int p = r.Next(l.Count);
+                                               Uri t = l[p];
+                                               l[p] = l[i];
+                                               l[i] = t;
+                                       }
+                               }
+                               if(removed)
+                               {
+                                       l.Add(failedConnectTransportURI);
+                               }
+                               return l;
+                       }
+               }
+
+               protected void restoreTransport(ITransport t)
+               {
+                       t.Start();
+                       //send information to the broker - informing it we are 
an ft client
+                       ConnectionControl cc = new ConnectionControl();
+                       cc.FaultTolerant = true;
+                       t.Oneway(cc);
+                       stateTracker.DoRestore(t);
+                       Dictionary<int, Command> tmpMap = null;
+                       lock(requestMap)
+                       {
+                               tmpMap = new Dictionary<int, 
Command>(requestMap);
+                       }
+                       foreach(Command command in tmpMap.Values)
+                       {
+                               t.Oneway(command);
+                       }
+               }
+
+               public bool UseExponentialBackOff
+               {
+                       get
+                       {
+                               return _useExponentialBackOff;
+                       }
+                       set
+                       {
+                               _useExponentialBackOff = value;
+                       }
+               }
+
+               public override String ToString()
+               {
+                       return ConnectedTransportURI == null ? "unconnected" : 
ConnectedTransportURI.ToString();
+               }
+
+               public String RemoteAddress
+               {
+                       get
+                       {
+                               FailoverTransport transport = 
ConnectedTransport as FailoverTransport;
+                               if(transport != null)
+                               {
+                                       return transport.RemoteAddress;
+                               }
+                               return null;
+                       }
+               }
+
+               public bool IsFaultTolerant
+               {
+                       get
+                       {
+                               return true;
+                       }
+               }
+
+               bool doReconnect()
+               {
+                       Exception failure = null;
+                       try
+                       {
+                               reconnectMutex.WaitOne();
+
+                               if(disposed || connectionFailure != null)
+                               {
+                               }
+
+                               if(ConnectedTransport != null || disposed || 
connectionFailure != null)
+                               {
+                                       return false;
+                               }
+                               else
+                               {
+                                       List<Uri> connectList = ConnectList;
+                                       if(connectList.Count == 0)
+                                       {
+                                               failure = new IOException("No 
uris available to connect to.");
+                                       }
+                                       else
+                                       {
+                                               if(!UseExponentialBackOff)
+                                               {
+                                                       ReconnectDelay = 
InitialReconnectDelay;
+                                               }
+                                               try
+                                               {
+                                                       backupMutex.WaitOne();
+                                                       if(Backup && 
backups.Count != 0)
+                                                       {
+                                                               BackupTransport 
bt = backups[0];
+                                                               
backups.RemoveAt(0);
+                                                               ITransport t = 
bt.Transport;
+                                                               Uri uri = 
bt.Uri;
+                                                               t.Command = new 
CommandHandler(onCommand);
+                                                               t.Exception = 
new ExceptionHandler(onException);
+                                                               try
+                                                               {
+                                                                       
if(started)
+                                                                       {
+                                                                               
restoreTransport(t);
+                                                                       }
+                                                                       
ReconnectDelay = InitialReconnectDelay;
+                                                                       
failedConnectTransportURI = null;
+                                                                       
ConnectedTransportURI = uri;
+                                                                       
ConnectedTransport = t;
+                                                                       
connectFailures = 0;
+                                                                       
Tracer.Info("Successfully reconnected to backup " + uri);
+                                                                       return 
false;
+                                                               }
+                                                               catch(Exception 
e)
+                                                               {
+                                                                       
e.GetType();
+                                                                       
Tracer.Debug("Backup transport failed");
+                                                               }
+                                                       }
+                                               }
+                                               finally
+                                               {
+                                                       
backupMutex.ReleaseMutex();
+                                               }
+
+                                               foreach(Uri uri in connectList)
+                                               {
+                                                       if(ConnectedTransport 
!= null || disposed)
+                                                       {
+                                                               break;
+                                                       }
+
+                                                       try
+                                                       {
+                                                               
Tracer.Debug("Attempting connect to: " + uri);
+                                                               ITransport t = 
TransportFactory.CompositeConnect(uri);
+                                                               t.Command = new 
CommandHandler(onCommand);
+                                                               t.Exception = 
new ExceptionHandler(onException);
+                                                               t.Start();
+
+                                                               if(started)
+                                                               {
+                                                                       
restoreTransport(t);
+                                                               }
+
+                                                               
Tracer.Debug("Connection established");
+                                                               ReconnectDelay 
= InitialReconnectDelay;
+                                                               
ConnectedTransportURI = uri;
+                                                               
ConnectedTransport = t;
+                                                               connectFailures 
= 0;
+
+                                                               
if(firstConnection)
+                                                               {
+                                                                       
firstConnection = false;
+                                                                       
Tracer.Info("Successfully connected to " + uri);
+                                                               }
+                                                               else
+                                                               {
+                                                                       
Tracer.Info("Successfully reconnected to " + uri);
+                                                               }
+                                                               connected = 
true;
+                                                               return false;
+                                                       }
+                                                       catch(Exception e)
+                                                       {
+                                                               failure = e;
+                                                               
Tracer.Debug("Connect fail to: " + uri + ", reason: " + e);
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               if(MaxReconnectAttempts > 0 && 
++connectFailures >= MaxReconnectAttempts)
+                               {
+                                       Tracer.Error("Failed to connect to 
transport after: " + connectFailures + " attempt(s)");
+                                       connectionFailure = failure;
+
+                                       onException(this, connectionFailure);
+
+                                       return false;
+                               }
+                       }
+                       finally
+                       {
+                               reconnectMutex.ReleaseMutex();
+                       }
+                       if(!disposed)
+                       {
+
+                               Tracer.Debug("Waiting " + ReconnectDelay + " ms 
before attempting connection. ");
+                               try
+                               {
+                                       sleepMutex.WaitOne();
+                                       try
+                                       {
+                                               Thread.Sleep(ReconnectDelay);
+                                       }
+                                       catch(ThreadInterruptedException e)
+                                       {
+                                               e.GetType();
+                                               
Thread.CurrentThread.Interrupt();
+                                       }
+                               }
+                               finally
+                               {
+                                       sleepMutex.ReleaseMutex();
+                               }
+
+                               if(UseExponentialBackOff)
+                               {
+                                       // Exponential increment of reconnect 
delay.
+                                       ReconnectDelay *= 
ReconnectDelayExponent;
+                                       if(ReconnectDelay > MaxReconnectDelay)
+                                       {
+                                               ReconnectDelay = 
MaxReconnectDelay;
+                                       }
+                               }
+                       }
+                       return !disposed;
+               }
+
+
+               bool buildBackups()
+               {
+                       try
+                       {
+                               backupMutex.WaitOne();
+                               if(!disposed && Backup && backups.Count < 
BackupPoolSize)
+                               {
+                                       List<Uri> connectList = ConnectList;
+                                       foreach(BackupTransport bt in backups)
+                                       {
+                                               if(bt.Disposed)
+                                               {
+                                                       backups.Remove(bt);
+                                               }
+                                       }
+                                       foreach(Uri uri in connectList)
+                                       {
+                                               if(ConnectedTransportURI != 
null && !ConnectedTransportURI.Equals(uri))
+                                               {
+                                                       try
+                                                       {
+                                                               BackupTransport 
bt = new BackupTransport(this);
+                                                               bt.Uri = uri;
+                                                               
if(!backups.Contains(bt))
+                                                               {
+                                                                       
ITransport t = TransportFactory.CompositeConnect(uri);
+                                                                       
t.Command = new CommandHandler(bt.onCommand);
+                                                                       
t.Exception = new ExceptionHandler(bt.onException);
+                                                                       
t.Start();
+                                                                       
bt.Transport = t;
+                                                                       
backups.Add(bt);
+                                                               }
+                                                       }
+                                                       catch(Exception e)
+                                                       {
+                                                               e.GetType();
+                                                               
Tracer.Debug("Failed to build backup ");
+                                                       }
+                                               }
+                                               if(backups.Count < 
BackupPoolSize)
+                                               {
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+                       finally
+                       {
+                               backupMutex.ReleaseMutex();
+                       }
+                       return false;
+               }
+
+               public bool IsDisposed
+               {
+                       get
+                       {
+                               return disposed;
+                       }
+               }
+
+               public bool Connected
+               {
+                       get
+                       {
+                               return connected;
+                       }
+               }
+
+               public void Reconnect(Uri uri)
+               {
+                       add(new Uri[] { uri });
+               }
+
+               public FutureResponse AsyncRequest(Command command)
+               {
+                       throw new ApplicationException("FailoverTransport does 
not implement AsyncRequest(Command)");
+               }
+
+               public Response Request(Command command)
+               {
+                       throw new ApplicationException("FailoverTransport does 
not implement Request(Command)");
+               }
+
+               public Response Request(Command command, TimeSpan ts)
+               {
+                       throw new ApplicationException("FailoverTransport does 
not implement Request(Command, TimeSpan)");
+               }
+
+               public CommandHandler Command
+               {
+                       get
+                       {
+                               return _commandHandler;
+                       }
+                       set
+                       {
+                               _commandHandler = value;
+                       }
+               }
+
+               public ExceptionHandler Exception
+               {
+                       get
+                       {
+                               return _exceptionHandler;
+                       }
+                       set
+                       {
+                               _exceptionHandler = value;
+                       }
+               }
+
+               public bool IsStarted
+               {
+                       get
+                       {
+                               return started;
+                       }
+               }
+
+               public void Dispose()
+               {
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
+               }
+
+               public void Dispose(bool disposing)
+               {
+                       if(disposing)
+                       {
+                               // get rid of unmanaged stuff
+                       }
+                       disposed = true;
+               }
+
+               public int CompareTo(Object o)
+               {
+                       if(o is FailoverTransport)
+                       {
+                               FailoverTransport oo = o as FailoverTransport;
+
+                               return this._id - oo._id;
+                       }
+                       else
+                       {
+                               throw new ArgumentException();
+                       }
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+       public class FailoverTransportFactory : ITransportFactory
+       {
+               private ITransport wrapTransport(ITransport transport)
+               {
+                       transport = new MutexTransport(transport);
+                       transport = new ResponseCorrelator(transport);
+                       return transport;
+               }
+
+               private ITransport doConnect(Uri location)
+               {
+                       ITransport transport = 
CreateTransport(URISupport.parseComposite(location));
+                       return wrapTransport(transport);
+               }
+
+               public ITransport CompositeConnect(Uri location)
+               {
+                       return 
CreateTransport(URISupport.parseComposite(location));
+               }
+
+               public ITransport CreateTransport(Uri location)
+               {
+                       return doConnect(location);
+               }
+
+               /// <summary>
+               /// </summary>
+               /// <param name="compositData"></param>
+               /// <returns></returns>
+               public ITransport CreateTransport(URISupport.CompositeData 
compositData)
+               {
+                       StringDictionary options = compositData.Parameters;
+                       FailoverTransport transport = CreateTransport(options);
+                       transport.add(compositData.Components);
+                       return transport;
+               }
+
+               public FailoverTransport CreateTransport(StringDictionary 
parameters)
+               {
+                       FailoverTransport transport = new FailoverTransport();
+                       URISupport.SetProperties(transport, parameters, "");
+                       return transport;
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+       public interface ICompositeTransport : ITransport
+       {
+               void add(Uri[] uris);
+               void remove(Uri[] uris);
+       }
+}
+

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS;
+
 using System;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -26,11 +26,11 @@
        /// <summary>
        /// Represents the logical networking transport layer.
        /// </summary>
-       public interface ITransport : IStartable, IDisposable
-    {
-        void Oneway(Command command);
-        
-        FutureResponse AsyncRequest(Command command);
+       public interface ITransport : IStartable, IDisposable, IStoppable
+       {
+               void Oneway(Command command);
+
+               FutureResponse AsyncRequest(Command command);
 
                TimeSpan RequestTimeout
                {
@@ -38,20 +38,25 @@
                        set;
                }
 
-        Response Request(Command command);
+               Response Request(Command command);
                Response Request(Command command, TimeSpan timeout);
-        
-        CommandHandler Command
+
+               CommandHandler Command
+               {
+                       get;
+                       set;
+               }
+
+               ExceptionHandler Exception
                {
-            get;
-            set;
-        }
-               
-        ExceptionHandler Exception
+                       get;
+                       set;
+               }
+
+               bool IsDisposed
                {
-            get;
-            set;
-        }
-    }
+                       get;
+               }
+       }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs
 Fri Oct 24 14:10:22 2008
@@ -20,7 +20,8 @@
 namespace Apache.NMS.ActiveMQ.Transport
 {
        public interface ITransportFactory
-    {
+       {
                ITransport CreateTransport(Uri location);
+               ITransport CompositeConnect(Uri location);
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 Fri Oct 24 14:10:22 2008
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS.Util;
+
 using System;
 using System.IO;
 using System.Net.Sockets;
 using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.OpenWire;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
@@ -37,6 +37,7 @@
                private BinaryWriter socketWriter;
                private Thread readThread;
                private bool started;
+               private bool disposed = false;
                private AtomicBoolean closed = new AtomicBoolean(false);
                private volatile bool seenShutdown;
                private TimeSpan maxWait = 
TimeSpan.FromMilliseconds(Timeout.Infinite);
@@ -63,15 +64,15 @@
                {
                        lock(myLock)
                        {
-                               if (!started)
+                               if(!started)
                                {
-                                       if (null == commandHandler)
+                                       if(null == commandHandler)
                                        {
                                                throw new 
InvalidOperationException(
                                                                "command cannot 
be null when Start is called.");
                                        }
 
-                                       if (null == exceptionHandler)
+                                       if(null == exceptionHandler)
                                        {
                                                throw new 
InvalidOperationException(
                                                                "exception 
cannot be null when Start is called.");
@@ -179,6 +180,11 @@
                        throw new NotImplementedException("Use a 
ResponseCorrelator if you want to issue Request calls");
                }
 
+               public void Stop()
+               {
+                       Close();
+               }
+
                public void Close()
                {
                        if(closed.CompareAndSet(false, true))
@@ -235,9 +241,9 @@
                                        {
                                                if(Thread.CurrentThread != 
readThread
 #if !NETCF
-                                                       && readThread.IsAlive
+ && readThread.IsAlive
 #endif
-                                                       )
+)
                                                {
                                                        TimeSpan waitTime;
 
@@ -273,6 +279,15 @@
                protected void Dispose(bool disposing)
                {
                        Close();
+                       disposed = true;
+               }
+
+               public bool IsDisposed
+               {
+                       get
+                       {
+                               return disposed;
+                       }
                }
 
                public void ReadLoop()
@@ -337,7 +352,7 @@
                        set { this.commandHandler = value; }
                }
 
-               public  ExceptionHandler Exception
+               public ExceptionHandler Exception
                {
                        get { return exceptionHandler; }
                        set { this.exceptionHandler = value; }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
 Fri Oct 24 14:10:22 2008
@@ -16,14 +16,12 @@
  */
 
 using System;
+using System.Collections.Specialized;
 using System.Net;
 using System.Net.Sockets;
-using System.Collections.Specialized;
 using Apache.NMS.ActiveMQ.OpenWire;
 using Apache.NMS.ActiveMQ.Transport.Stomp;
-using Apache.NMS;
 using Apache.NMS.Util;
-using System.Threading;
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
@@ -103,7 +101,7 @@
 
                #region ITransportFactory Members
 
-               public ITransport CreateTransport(Uri location)
+               public ITransport CompositeConnect(Uri location)
                {
                        // Extract query parameters from broker Uri
                        StringDictionary map = 
URISupport.ParseQuery(location.Query);
@@ -136,6 +134,15 @@
                                transport = new WireFormatNegotiator(transport, 
(OpenWireFormat) wireformat);
                        }
 
+                       transport.RequestTimeout = this.requestTimeout;
+
+                       return transport;
+               }
+
+               public ITransport CreateTransport(Uri location)
+               {
+                       ITransport transport = CompositeConnect(location);
+
                        transport = new MutexTransport(transport);
                        transport = new ResponseCorrelator(transport);
                        transport.RequestTimeout = this.requestTimeout;

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=707747&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
 Fri Oct 24 14:10:22 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+       public class TransportFactory
+       {
+
+               private static Dictionary<String, ITransportFactory> 
TRANSPORT_FACTORYS = new Dictionary<String, ITransportFactory>();
+
+               static TransportFactory()
+               {
+                       TRANSPORT_FACTORYS.Add("tcp", new 
TcpTransportFactory());
+                       TRANSPORT_FACTORYS.Add("failover", new 
FailoverTransportFactory());
+               }
+
+               /// <summary>
+               /// Creates a normal transport. 
+               /// </summary>
+               /// <param name="location"></param>
+               /// <returns>the transport</returns>
+               public static ITransport CreateTransport(Uri location)
+               {
+                       ITransportFactory tf = findTransportFactory(location);
+                       return tf.CreateTransport(location);
+               }
+
+               public static ITransport CompositeConnect(Uri location)
+               {
+                       ITransportFactory tf = findTransportFactory(location);
+                       return tf.CompositeConnect(location);
+               }
+
+               /// <summary>
+               /// </summary>
+               /// <param name="location"></param>
+               /// <returns></returns>
+               private static ITransportFactory findTransportFactory(Uri 
location)
+               {
+                       String scheme = location.Scheme;
+                       if(scheme == null)
+                       {
+                               throw new IOException("Transport not scheme 
specified: [" + location + "]");
+                       }
+                       ITransportFactory tf = TRANSPORT_FACTORYS[scheme];
+                       if(tf == null)
+                       {
+                               throw new ApplicationException("Transport 
Factory for " + scheme + " does not exist.");
+                       }
+                       return tf;
+               }
+       }
+}

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
+
 using System;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -28,6 +28,7 @@
                protected readonly ITransport next;
                protected CommandHandler commandHandler;
                protected ExceptionHandler exceptionHandler;
+               private bool disposed = false;
 
                public TransportFilter(ITransport next)
                {
@@ -107,12 +108,12 @@
                {
                        if(commandHandler == null)
                        {
-                               throw new InvalidOperationException ("command 
cannot be null when Start is called.");
+                               throw new InvalidOperationException("command 
cannot be null when Start is called.");
                        }
 
                        if(exceptionHandler == null)
                        {
-                               throw new InvalidOperationException ("exception 
cannot be null when Start is called.");
+                               throw new InvalidOperationException("exception 
cannot be null when Start is called.");
                        }
 
                        this.next.Start();
@@ -141,6 +142,15 @@
                        {
                                this.next.Dispose();
                        }
+                       disposed = true;
+               }
+
+               public bool IsDisposed
+               {
+                       get
+                       {
+                               return disposed;
+                       }
                }
 
                public CommandHandler Command
@@ -149,12 +159,15 @@
                        set { this.commandHandler = value; }
                }
 
-               public  ExceptionHandler Exception
+               public ExceptionHandler Exception
                {
                        get { return exceptionHandler; }
                        set { this.exceptionHandler = value; }
                }
 
+               public virtual void Stop()
+               {
+               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj 
Fri Oct 24 14:10:22 2008
@@ -237,6 +237,7 @@
     <Compile Include="src\main\csharp\Commands\PartialCommand.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Commands\ProducerAck.cs" />
     <Compile Include="src\main\csharp\Commands\ProducerId.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -303,6 +304,7 @@
       <SubType>Code</SubType>
     </Compile>
     <Compile Include="src\main\csharp\DispatchingThread.cs" />
+    <Compile Include="src\main\csharp\IOException.cs" />
     <Compile Include="src\main\csharp\ISynchronization.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -700,12 +702,32 @@
     <Compile Include="src\main\csharp\Session.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionState.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" />
+    <Compile Include="src\main\csharp\State\ConsumerState.cs" />
+    <Compile Include="src\main\csharp\State\ICommandVisitor.cs" />
+    <Compile Include="src\main\csharp\State\ProducerState.cs" />
+    <Compile Include="src\main\csharp\State\SessionState.cs" />
+    <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" />
+    <Compile Include="src\main\csharp\State\ThreadSimulator.cs" />
+    <Compile Include="src\main\csharp\State\Tracked.cs" />
+    <Compile Include="src\main\csharp\State\TransactionState.cs" />
+    <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+    <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\Task.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
     <Compile Include="src\main\csharp\TransactionContext.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\Failover\BackupTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" 
/>
+    <Compile 
Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />
     <Compile Include="src\main\csharp\Transport\FutureResponse.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" />
     <Compile Include="src\main\csharp\Transport\ITransport.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -731,6 +753,7 @@
     <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\TransportFactory.cs" />
     <Compile Include="src\main\csharp\Transport\TransportFilter.cs">
       <SubType>Code</SubType>
     </Compile>


Reply via email to