Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; + +namespace Apache.NMS.ActiveMQ.Threads +{ + /// <summary> + /// Manages the thread pool for long running tasks. Long running tasks are not + /// always active but when they are active, they may need a few iterations of + /// processing for them to become idle. The manager ensures that each task is + /// processes but that no one task overtakes the system. This is kina like + /// cooperative multitasking. + /// </summary> + public class TaskRunnerFactory + { + + private int maxIterationsPerRun; + private String name; + private ThreadPriority priority; + private bool daemon; + + public TaskRunnerFactory() + { + initTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false); + } + + public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun) + { + initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false); + } + + public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner) + { + initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner); + } + + public void initTaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner) + { + + this.name = name; + this.priority = priority; + this.daemon = daemon; + this.maxIterationsPerRun = maxIterationsPerRun; + + // If your OS/JVM combination has a good thread model, you may want to avoid + // using a thread pool to run tasks and use a DedicatedTaskRunner instead. + } + + public void shutdown() + { + } + + public TaskRunner CreateTaskRunner(Task task, String name) + { + return new PooledTaskRunner(task, maxIterationsPerRun); + } + } +}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +using Apache.NMS.ActiveMQ.Commands; + +namespace Apache.NMS.ActiveMQ.Transport.Failover +{ + class BackupTransport + { + private FailoverTransport failoverTransport; + private ITransport transport; + private Uri uri; + private bool disposed; + + public BackupTransport(FailoverTransport ft) + { + this.failoverTransport = ft; + } + + public void onCommand(ITransport t, Command c) + { + } + + public void onException(ITransport t, Exception error) + { + this.disposed = true; + if(failoverTransport != null) + { + this.failoverTransport.Reconnect(); + } + } + + public ITransport Transport + { + get + { + return transport; + } + set + { + transport = value; + } + } + + public Uri Uri + { + get + { + return uri; + } + set + { + uri = value; + } + } + + public bool Disposed + { + get + { + return disposed || (transport != null && transport.IsDisposed); + } + set + { + disposed = value; + } + } + + public int hashCode() + { + return uri != null ? uri.GetHashCode() : -1; + } + + public bool equals(Object obj) + { + if(obj is BackupTransport) + { + BackupTransport other = obj as BackupTransport; + return uri == null && other.uri == null || + (uri != null && other.uri != null && uri.Equals(other.uri)); + } + return false; + } + } +} Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,1142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.ActiveMQ.State; +using Apache.NMS.ActiveMQ.Threads; +using Apache.NMS.Util; + +namespace Apache.NMS.ActiveMQ.Transport.Failover +{ + /// <summary> + /// A Transport that is made reliable by being able to fail over to another + /// transport when a transport failure is detected. + /// </summary> + public class FailoverTransport : ICompositeTransport, IComparable + { + + private static int _idCounter = 0; + private int _id; + + private bool disposed; + private bool connected; + private List<Uri> uris = new List<Uri>(); + private CommandHandler _commandHandler; + private ExceptionHandler _exceptionHandler; + + private Mutex reconnectMutex = new Mutex(); + private Mutex backupMutex = new Mutex(); + private Mutex sleepMutex = new Mutex(); + private Mutex listenerMutex = new Mutex(); + private ConnectionStateTracker stateTracker = new ConnectionStateTracker(); + private Dictionary<int, Command> requestMap = new Dictionary<int, Command>(); + + private Uri connectedTransportURI; + private Uri failedConnectTransportURI; + private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null); + private TaskRunner reconnectTask = null; + private bool started; + + private int _initialReconnectDelay = 10; + private int _maxReconnectDelay = 1000 * 30; + private int _backOffMultiplier = 2; + private bool _useExponentialBackOff = true; + private bool _randomize = true; + private bool initialized; + private int _maxReconnectAttempts; + private int connectFailures; + private int _reconnectDelay = 10; + private Exception connectionFailure; + private bool firstConnection = true; + //optionally always have a backup created + private bool _backup = false; + private List<BackupTransport> backups = new List<BackupTransport>(); + private int _backupPoolSize = 1; + private bool _trackMessages = false; + private int _maxCacheSize = 256; + private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; + + public TimeSpan RequestTimeout + { + get + { + return requestTimeout; + } + set + { + requestTimeout = value; + } + } + + private class FailoverTask : Task + { + private FailoverTransport parent; + + public FailoverTask(FailoverTransport p) + { + parent = p; + } + + public bool iterate() + { + bool result = false; + bool buildBackup = true; + bool doReconnect = !parent.disposed; + try + { + parent.backupMutex.WaitOne(); + if(parent.ConnectedTransport == null && !parent.disposed) + { + result = parent.doReconnect(); + buildBackup = false; + } + } + finally + { + parent.backupMutex.ReleaseMutex(); + } + if(buildBackup) + { + parent.buildBackups(); + } + else + { + //build backups on the next iteration + result = true; + try + { + parent.reconnectTask.wakeup(); + } + catch(ThreadInterruptedException e) + { + e.GetType(); + Tracer.Debug("Reconnect task has been interrupted."); + } + } + return result; + } + } + + public FailoverTransport() + { + _id = _idCounter++; + + stateTracker.TrackTransactions = true; + } + + ~FailoverTransport() + { + Dispose(false); + } + + public void onCommand(ITransport sender, Command command) + { + if(command != null) + { + if(command.IsResponse) + { + Object oo = null; + lock(requestMap) + { + int v = ((Response) command).CorrelationId; + try + { + oo = requestMap[v]; + requestMap.Remove(v); + } + catch + { + } + } + Tracked t = oo as Tracked; + if(t != null) + { + t.onResponses(); + } + } + if(!initialized) + { + if(command.IsBrokerInfo) + { + BrokerInfo info = (BrokerInfo) command; + BrokerInfo[] peers = info.PeerBrokerInfos; + if(peers != null) + { + for(int i = 0; i < peers.Length; i++) + { + String brokerString = peers[i].BrokerURL; + add(brokerString); + } + } + initialized = true; + } + } + } + this.Command(sender, command); + } + + public void onException(ITransport sender, Exception error) + { + try + { + handleTransportFailure(error); + } + catch(Exception e) + { + e.GetType(); + // What to do here? + } + } + + public void disposedOnCommand(ITransport sender, Command c) + { + } + + public void disposedOnException(ITransport sender, Exception e) + { + } + + public void handleTransportFailure(Exception e) + { + ITransport transport = connectedTransport.GetAndSet(null); + if(transport != null) + { + transport.Command = new CommandHandler(disposedOnCommand); + transport.Exception = new ExceptionHandler(disposedOnException); + try + { + transport.Stop(); + } + catch(Exception ex) + { + ex.GetType(); // Ignore errors but this lets us see the error during debugging + } + + try + { + reconnectMutex.WaitOne(); + bool reconnectOk = false; + if(started) + { + Tracer.Warn("Transport failed to " + ConnectedTransportURI + " , attempting to automatically reconnect due to: " + e.Message); + Tracer.Debug("Transport failed with the following exception:" + e.Message); + reconnectOk = true; + } + + initialized = false; + failedConnectTransportURI = ConnectedTransportURI; + ConnectedTransportURI = null; + connected = false; + if(reconnectOk) + { + reconnectTask.wakeup(); + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + } + } + + public void Start() + { + try + { + reconnectMutex.WaitOne(); + Tracer.Debug("Started."); + if(started) + { + return; + } + started = true; + stateTracker.MaxCacheSize = MaxCacheSize; + stateTracker.TrackMessages = TrackMessages; + if(ConnectedTransport != null) + { + stateTracker.DoRestore(ConnectedTransport); + } + else + { + Reconnect(); + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + } + + public virtual void Stop() + { + ITransport transportToStop = null; + try + { + reconnectMutex.WaitOne(); + Tracer.Debug("Stopped."); + if(!started) + { + return; + } + started = false; + disposed = true; + connected = false; + foreach(BackupTransport t in backups) + { + t.Disposed = true; + } + backups.Clear(); + + if(ConnectedTransport != null) + { + transportToStop = connectedTransport.GetAndSet(null); + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + try + { + sleepMutex.WaitOne(); + } + finally + { + sleepMutex.ReleaseMutex(); + } + reconnectTask.shutdown(); + if(transportToStop != null) + { + transportToStop.Stop(); + } + } + + public int InitialReconnectDelay + { + get + { + return _initialReconnectDelay; + } + set + { + _initialReconnectDelay = value; + } + } + + public int MaxReconnectDelay + { + get + { + return _maxReconnectDelay; + } + set + { + _maxReconnectDelay = value; + } + } + + public int ReconnectDelay + { + get + { + return _reconnectDelay; + } + set + { + _reconnectDelay = value; + } + } + + public int ReconnectDelayExponent + { + get + { + return _backOffMultiplier; + } + set + { + _backOffMultiplier = value; + } + } + + public ITransport ConnectedTransport + { + get + { + return connectedTransport.Value; + } + set + { + connectedTransport.Value = value; + } + } + + public Uri ConnectedTransportURI + { + get + { + return connectedTransportURI; + } + set + { + connectedTransportURI = value; + } + } + + public int MaxReconnectAttempts + { + get + { + return _maxReconnectAttempts; + } + set + { + _maxReconnectAttempts = value; + } + } + + public bool Randomize + { + get + { + return _randomize; + } + set + { + _randomize = value; + } + } + + public bool Backup + { + get + { + return _backup; + } + set + { + _backup = value; + } + } + + public int BackupPoolSize + { + get + { + return _backupPoolSize; + } + set + { + _backupPoolSize = value; + } + } + + public bool TrackMessages + { + get + { + return _trackMessages; + } + set + { + _trackMessages = value; + } + } + + public int MaxCacheSize + { + get + { + return _maxCacheSize; + } + set + { + _maxCacheSize = value; + } + } + + /// <summary> + /// </summary> + /// <param name="command"></param> + /// <returns>Returns true if the command is one sent when a connection is being closed.</returns> + private bool IsShutdownCommand(Command command) + { + return (command != null && (command.IsShutdownInfo || command is RemoveInfo)); + } + + public void Oneway(Command command) + { + Exception error = null; + try + { + + try + { + reconnectMutex.WaitOne(); + + if(IsShutdownCommand(command) && ConnectedTransport == null) + { + if(command.IsShutdownInfo) + { + // Skipping send of ShutdownInfo command when not connected. + return; + } + if(command is RemoveInfo) + { + // Simulate response to RemoveInfo command + Response response = new Response(); + response.CorrelationId = command.CommandId; + onCommand(this, response); + return; + } + } + // Keep trying until the message is sent. + for(int i = 0; !disposed; i++) + { + try + { + // Wait for transport to be connected. + ITransport transport = ConnectedTransport; + while(transport == null && !disposed + && connectionFailure == null + // && !Thread.CurrentThread.isInterrupted() + ) + { + Tracer.Info("Waiting for transport to reconnect."); + try + { + // Release so that the reconnect task can run + reconnectMutex.ReleaseMutex(); + try + { + // Wait for something + Thread.Sleep(1000); + } + catch(ThreadInterruptedException e) + { + Thread.CurrentThread.Interrupt(); // KILROY - not needed + Tracer.Debug("Interupted: " + e); + } + } + finally + { + reconnectMutex.WaitOne(); + } + transport = ConnectedTransport; + } + + if(transport == null) + { + // Previous loop may have exited due to use being + // disposed. + if(disposed) + { + error = new IOException("Transport disposed."); + } + else if(connectionFailure != null) + { + error = connectionFailure; + } + else + { + error = new IOException("Unexpected failure."); + } + break; + } + + // If it was a request and it was not being tracked by + // the state tracker, + // then hold it in the requestMap so that we can replay + // it later. + Tracked tracked = stateTracker.track(command); + lock(requestMap) + { + if(tracked != null && tracked.WaitingForResponse) + { + requestMap.Add(command.CommandId, tracked); + } + else if(tracked == null && command.ResponseRequired) + { + requestMap.Add(command.CommandId, command); + } + } + + // Send the message. + try + { + transport.Oneway(command); + stateTracker.trackBack(command); + } + catch(Exception e) + { + + // If the command was not tracked.. we will retry in + // this method + if(tracked == null) + { + + // since we will retry in this method.. take it + // out of the request + // map so that it is not sent 2 times on + // recovery + if(command.ResponseRequired) + { + requestMap.Remove(command.CommandId); + } + + // Rethrow the exception so it will handled by + // the outer catch + throw e; + } + + } + + return; + + } + catch(Exception e) + { + Tracer.Debug("Send Oneway attempt: " + i + " failed."); + handleTransportFailure(e); + } + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + } + catch(ThreadInterruptedException e) + { + e.GetType(); + // Some one may be trying to stop our thread. + Thread.CurrentThread.Interrupt(); + throw new ThreadInterruptedException(); + } + if(!disposed) + { + if(error != null) + { + throw error; + } + } + } + + /* + public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) { + throw new AssertionError("Unsupported Method"); + } + */ + + public Object Request(Object command) + { + throw new ApplicationException("FailoverTransport does not support Request(Object)"); + } + + public Object Request(Object command, int timeout) + { + throw new ApplicationException("FailoverTransport does not support Request(Object, Int)"); + } + + public void add(Uri[] u) + { + lock(uris) + { + for(int i = 0; i < u.Length; i++) + { + if(!uris.Contains(u[i])) + { + uris.Add(u[i]); + } + } + } + Reconnect(); + } + + public void remove(Uri[] u) + { + lock(uris) + { + for(int i = 0; i < u.Length; i++) + { + uris.Remove(u[i]); + } + } + Reconnect(); + } + + public void add(String u) + { + try + { + Uri uri = new Uri(u); + lock(uris) + { + if(!uris.Contains(uri)) + { + uris.Add(uri); + } + } + + Reconnect(); + } + catch(Exception e) + { + Tracer.Error("Failed to parse URI: " + u + " because " + e.Message); + } + } + + public void Reconnect() + { + try + { + reconnectMutex.WaitOne(); + + if(started) + { + if(reconnectTask == null) + { + Tracer.Debug("Creating reconnect task"); + reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this), + "ActiveMQ Failover Worker: " + this.GetHashCode().ToString()); + } + + Tracer.Debug("Waking up reconnect task"); + try + { + reconnectTask.wakeup(); + } + catch(ThreadInterruptedException e) + { + e.GetType(); // Suppress warning + Thread.CurrentThread.Interrupt(); + } + } + else + { + Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + } + + private List<Uri> ConnectList + { + get + { + List<Uri> l = new List<Uri>(uris); + bool removed = false; + if(failedConnectTransportURI != null) + { + removed = l.Remove(failedConnectTransportURI); + } + if(Randomize) + { + // Randomly, reorder the list by random swapping + Random r = new Random(DateTime.Now.Millisecond); + for(int i = 0; i < l.Count; i++) + { + int p = r.Next(l.Count); + Uri t = l[p]; + l[p] = l[i]; + l[i] = t; + } + } + if(removed) + { + l.Add(failedConnectTransportURI); + } + return l; + } + } + + protected void restoreTransport(ITransport t) + { + t.Start(); + //send information to the broker - informing it we are an ft client + ConnectionControl cc = new ConnectionControl(); + cc.FaultTolerant = true; + t.Oneway(cc); + stateTracker.DoRestore(t); + Dictionary<int, Command> tmpMap = null; + lock(requestMap) + { + tmpMap = new Dictionary<int, Command>(requestMap); + } + foreach(Command command in tmpMap.Values) + { + t.Oneway(command); + } + } + + public bool UseExponentialBackOff + { + get + { + return _useExponentialBackOff; + } + set + { + _useExponentialBackOff = value; + } + } + + public override String ToString() + { + return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString(); + } + + public String RemoteAddress + { + get + { + FailoverTransport transport = ConnectedTransport as FailoverTransport; + if(transport != null) + { + return transport.RemoteAddress; + } + return null; + } + } + + public bool IsFaultTolerant + { + get + { + return true; + } + } + + bool doReconnect() + { + Exception failure = null; + try + { + reconnectMutex.WaitOne(); + + if(disposed || connectionFailure != null) + { + } + + if(ConnectedTransport != null || disposed || connectionFailure != null) + { + return false; + } + else + { + List<Uri> connectList = ConnectList; + if(connectList.Count == 0) + { + failure = new IOException("No uris available to connect to."); + } + else + { + if(!UseExponentialBackOff) + { + ReconnectDelay = InitialReconnectDelay; + } + try + { + backupMutex.WaitOne(); + if(Backup && backups.Count != 0) + { + BackupTransport bt = backups[0]; + backups.RemoveAt(0); + ITransport t = bt.Transport; + Uri uri = bt.Uri; + t.Command = new CommandHandler(onCommand); + t.Exception = new ExceptionHandler(onException); + try + { + if(started) + { + restoreTransport(t); + } + ReconnectDelay = InitialReconnectDelay; + failedConnectTransportURI = null; + ConnectedTransportURI = uri; + ConnectedTransport = t; + connectFailures = 0; + Tracer.Info("Successfully reconnected to backup " + uri); + return false; + } + catch(Exception e) + { + e.GetType(); + Tracer.Debug("Backup transport failed"); + } + } + } + finally + { + backupMutex.ReleaseMutex(); + } + + foreach(Uri uri in connectList) + { + if(ConnectedTransport != null || disposed) + { + break; + } + + try + { + Tracer.Debug("Attempting connect to: " + uri); + ITransport t = TransportFactory.CompositeConnect(uri); + t.Command = new CommandHandler(onCommand); + t.Exception = new ExceptionHandler(onException); + t.Start(); + + if(started) + { + restoreTransport(t); + } + + Tracer.Debug("Connection established"); + ReconnectDelay = InitialReconnectDelay; + ConnectedTransportURI = uri; + ConnectedTransport = t; + connectFailures = 0; + + if(firstConnection) + { + firstConnection = false; + Tracer.Info("Successfully connected to " + uri); + } + else + { + Tracer.Info("Successfully reconnected to " + uri); + } + connected = true; + return false; + } + catch(Exception e) + { + failure = e; + Tracer.Debug("Connect fail to: " + uri + ", reason: " + e); + } + } + } + } + + if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts) + { + Tracer.Error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); + connectionFailure = failure; + + onException(this, connectionFailure); + + return false; + } + } + finally + { + reconnectMutex.ReleaseMutex(); + } + if(!disposed) + { + + Tracer.Debug("Waiting " + ReconnectDelay + " ms before attempting connection. "); + try + { + sleepMutex.WaitOne(); + try + { + Thread.Sleep(ReconnectDelay); + } + catch(ThreadInterruptedException e) + { + e.GetType(); + Thread.CurrentThread.Interrupt(); + } + } + finally + { + sleepMutex.ReleaseMutex(); + } + + if(UseExponentialBackOff) + { + // Exponential increment of reconnect delay. + ReconnectDelay *= ReconnectDelayExponent; + if(ReconnectDelay > MaxReconnectDelay) + { + ReconnectDelay = MaxReconnectDelay; + } + } + } + return !disposed; + } + + + bool buildBackups() + { + try + { + backupMutex.WaitOne(); + if(!disposed && Backup && backups.Count < BackupPoolSize) + { + List<Uri> connectList = ConnectList; + foreach(BackupTransport bt in backups) + { + if(bt.Disposed) + { + backups.Remove(bt); + } + } + foreach(Uri uri in connectList) + { + if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri)) + { + try + { + BackupTransport bt = new BackupTransport(this); + bt.Uri = uri; + if(!backups.Contains(bt)) + { + ITransport t = TransportFactory.CompositeConnect(uri); + t.Command = new CommandHandler(bt.onCommand); + t.Exception = new ExceptionHandler(bt.onException); + t.Start(); + bt.Transport = t; + backups.Add(bt); + } + } + catch(Exception e) + { + e.GetType(); + Tracer.Debug("Failed to build backup "); + } + } + if(backups.Count < BackupPoolSize) + { + break; + } + } + } + } + finally + { + backupMutex.ReleaseMutex(); + } + return false; + } + + public bool IsDisposed + { + get + { + return disposed; + } + } + + public bool Connected + { + get + { + return connected; + } + } + + public void Reconnect(Uri uri) + { + add(new Uri[] { uri }); + } + + public FutureResponse AsyncRequest(Command command) + { + throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)"); + } + + public Response Request(Command command) + { + throw new ApplicationException("FailoverTransport does not implement Request(Command)"); + } + + public Response Request(Command command, TimeSpan ts) + { + throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)"); + } + + public CommandHandler Command + { + get + { + return _commandHandler; + } + set + { + _commandHandler = value; + } + } + + public ExceptionHandler Exception + { + get + { + return _exceptionHandler; + } + set + { + _exceptionHandler = value; + } + } + + public bool IsStarted + { + get + { + return started; + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public void Dispose(bool disposing) + { + if(disposing) + { + // get rid of unmanaged stuff + } + disposed = true; + } + + public int CompareTo(Object o) + { + if(o is FailoverTransport) + { + FailoverTransport oo = o as FailoverTransport; + + return this._id - oo._id; + } + else + { + throw new ArgumentException(); + } + } + } +} Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Specialized; + +using Apache.NMS.Util; + +namespace Apache.NMS.ActiveMQ.Transport.Failover +{ + public class FailoverTransportFactory : ITransportFactory + { + private ITransport wrapTransport(ITransport transport) + { + transport = new MutexTransport(transport); + transport = new ResponseCorrelator(transport); + return transport; + } + + private ITransport doConnect(Uri location) + { + ITransport transport = CreateTransport(URISupport.parseComposite(location)); + return wrapTransport(transport); + } + + public ITransport CompositeConnect(Uri location) + { + return CreateTransport(URISupport.parseComposite(location)); + } + + public ITransport CreateTransport(Uri location) + { + return doConnect(location); + } + + /// <summary> + /// </summary> + /// <param name="compositData"></param> + /// <returns></returns> + public ITransport CreateTransport(URISupport.CompositeData compositData) + { + StringDictionary options = compositData.Parameters; + FailoverTransport transport = CreateTransport(options); + transport.add(compositData.Components); + return transport; + } + + public FailoverTransport CreateTransport(StringDictionary parameters) + { + FailoverTransport transport = new FailoverTransport(); + URISupport.SetProperties(transport, parameters, ""); + return transport; + } + } +} Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace Apache.NMS.ActiveMQ.Transport +{ + public interface ICompositeTransport : ITransport + { + void add(Uri[] uris); + void remove(Uri[] uris); + } +} + Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Fri Oct 24 14:10:22 2008 @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS; + using System; +using Apache.NMS.ActiveMQ.Commands; namespace Apache.NMS.ActiveMQ.Transport { @@ -26,11 +26,11 @@ /// <summary> /// Represents the logical networking transport layer. /// </summary> - public interface ITransport : IStartable, IDisposable - { - void Oneway(Command command); - - FutureResponse AsyncRequest(Command command); + public interface ITransport : IStartable, IDisposable, IStoppable + { + void Oneway(Command command); + + FutureResponse AsyncRequest(Command command); TimeSpan RequestTimeout { @@ -38,20 +38,25 @@ set; } - Response Request(Command command); + Response Request(Command command); Response Request(Command command, TimeSpan timeout); - - CommandHandler Command + + CommandHandler Command + { + get; + set; + } + + ExceptionHandler Exception { - get; - set; - } - - ExceptionHandler Exception + get; + set; + } + + bool IsDisposed { - get; - set; - } - } + get; + } + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs Fri Oct 24 14:10:22 2008 @@ -20,7 +20,8 @@ namespace Apache.NMS.ActiveMQ.Transport { public interface ITransportFactory - { + { ITransport CreateTransport(Uri location); + ITransport CompositeConnect(Uri location); } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Oct 24 14:10:22 2008 @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS.ActiveMQ.OpenWire; -using Apache.NMS.ActiveMQ.Transport; -using Apache.NMS.Util; + using System; using System.IO; using System.Net.Sockets; using System.Threading; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.ActiveMQ.OpenWire; +using Apache.NMS.Util; namespace Apache.NMS.ActiveMQ.Transport.Tcp { @@ -37,6 +37,7 @@ private BinaryWriter socketWriter; private Thread readThread; private bool started; + private bool disposed = false; private AtomicBoolean closed = new AtomicBoolean(false); private volatile bool seenShutdown; private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite); @@ -63,15 +64,15 @@ { lock(myLock) { - if (!started) + if(!started) { - if (null == commandHandler) + if(null == commandHandler) { throw new InvalidOperationException( "command cannot be null when Start is called."); } - if (null == exceptionHandler) + if(null == exceptionHandler) { throw new InvalidOperationException( "exception cannot be null when Start is called."); @@ -179,6 +180,11 @@ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls"); } + public void Stop() + { + Close(); + } + public void Close() { if(closed.CompareAndSet(false, true)) @@ -235,9 +241,9 @@ { if(Thread.CurrentThread != readThread #if !NETCF - && readThread.IsAlive + && readThread.IsAlive #endif - ) +) { TimeSpan waitTime; @@ -273,6 +279,15 @@ protected void Dispose(bool disposing) { Close(); + disposed = true; + } + + public bool IsDisposed + { + get + { + return disposed; + } } public void ReadLoop() @@ -337,7 +352,7 @@ set { this.commandHandler = value; } } - public ExceptionHandler Exception + public ExceptionHandler Exception { get { return exceptionHandler; } set { this.exceptionHandler = value; } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Oct 24 14:10:22 2008 @@ -16,14 +16,12 @@ */ using System; +using System.Collections.Specialized; using System.Net; using System.Net.Sockets; -using System.Collections.Specialized; using Apache.NMS.ActiveMQ.OpenWire; using Apache.NMS.ActiveMQ.Transport.Stomp; -using Apache.NMS; using Apache.NMS.Util; -using System.Threading; namespace Apache.NMS.ActiveMQ.Transport.Tcp { @@ -103,7 +101,7 @@ #region ITransportFactory Members - public ITransport CreateTransport(Uri location) + public ITransport CompositeConnect(Uri location) { // Extract query parameters from broker Uri StringDictionary map = URISupport.ParseQuery(location.Query); @@ -136,6 +134,15 @@ transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat); } + transport.RequestTimeout = this.requestTimeout; + + return transport; + } + + public ITransport CreateTransport(Uri location) + { + ITransport transport = CompositeConnect(location); + transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); transport.RequestTimeout = this.requestTimeout; Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=707747&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs Fri Oct 24 14:10:22 2008 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; + +using Apache.NMS.ActiveMQ.Transport.Failover; +using Apache.NMS.ActiveMQ.Transport.Tcp; + +namespace Apache.NMS.ActiveMQ.Transport +{ + public class TransportFactory + { + + private static Dictionary<String, ITransportFactory> TRANSPORT_FACTORYS = new Dictionary<String, ITransportFactory>(); + + static TransportFactory() + { + TRANSPORT_FACTORYS.Add("tcp", new TcpTransportFactory()); + TRANSPORT_FACTORYS.Add("failover", new FailoverTransportFactory()); + } + + /// <summary> + /// Creates a normal transport. + /// </summary> + /// <param name="location"></param> + /// <returns>the transport</returns> + public static ITransport CreateTransport(Uri location) + { + ITransportFactory tf = findTransportFactory(location); + return tf.CreateTransport(location); + } + + public static ITransport CompositeConnect(Uri location) + { + ITransportFactory tf = findTransportFactory(location); + return tf.CompositeConnect(location); + } + + /// <summary> + /// </summary> + /// <param name="location"></param> + /// <returns></returns> + private static ITransportFactory findTransportFactory(Uri location) + { + String scheme = location.Scheme; + if(scheme == null) + { + throw new IOException("Transport not scheme specified: [" + location + "]"); + } + ITransportFactory tf = TRANSPORT_FACTORYS[scheme]; + if(tf == null) + { + throw new ApplicationException("Transport Factory for " + scheme + " does not exist."); + } + return tf; + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Oct 24 14:10:22 2008 @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS.ActiveMQ.Transport; + using System; +using Apache.NMS.ActiveMQ.Commands; namespace Apache.NMS.ActiveMQ.Transport { @@ -28,6 +28,7 @@ protected readonly ITransport next; protected CommandHandler commandHandler; protected ExceptionHandler exceptionHandler; + private bool disposed = false; public TransportFilter(ITransport next) { @@ -107,12 +108,12 @@ { if(commandHandler == null) { - throw new InvalidOperationException ("command cannot be null when Start is called."); + throw new InvalidOperationException("command cannot be null when Start is called."); } if(exceptionHandler == null) { - throw new InvalidOperationException ("exception cannot be null when Start is called."); + throw new InvalidOperationException("exception cannot be null when Start is called."); } this.next.Start(); @@ -141,6 +142,15 @@ { this.next.Dispose(); } + disposed = true; + } + + public bool IsDisposed + { + get + { + return disposed; + } } public CommandHandler Command @@ -149,12 +159,15 @@ set { this.commandHandler = value; } } - public ExceptionHandler Exception + public ExceptionHandler Exception { get { return exceptionHandler; } set { this.exceptionHandler = value; } } + public virtual void Stop() + { + } } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=707747&r1=707746&r2=707747&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Fri Oct 24 14:10:22 2008 @@ -237,6 +237,7 @@ <Compile Include="src\main\csharp\Commands\PartialCommand.cs"> <SubType>Code</SubType> </Compile> + <Compile Include="src\main\csharp\Commands\ProducerAck.cs" /> <Compile Include="src\main\csharp\Commands\ProducerId.cs"> <SubType>Code</SubType> </Compile> @@ -303,6 +304,7 @@ <SubType>Code</SubType> </Compile> <Compile Include="src\main\csharp\DispatchingThread.cs" /> + <Compile Include="src\main\csharp\IOException.cs" /> <Compile Include="src\main\csharp\ISynchronization.cs"> <SubType>Code</SubType> </Compile> @@ -700,12 +702,32 @@ <Compile Include="src\main\csharp\Session.cs"> <SubType>Code</SubType> </Compile> + <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" /> + <Compile Include="src\main\csharp\State\ConnectionState.cs" /> + <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" /> + <Compile Include="src\main\csharp\State\ConsumerState.cs" /> + <Compile Include="src\main\csharp\State\ICommandVisitor.cs" /> + <Compile Include="src\main\csharp\State\ProducerState.cs" /> + <Compile Include="src\main\csharp\State\SessionState.cs" /> + <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" /> + <Compile Include="src\main\csharp\State\ThreadSimulator.cs" /> + <Compile Include="src\main\csharp\State\Tracked.cs" /> + <Compile Include="src\main\csharp\State\TransactionState.cs" /> + <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" /> + <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" /> + <Compile Include="src\main\csharp\Threads\Task.cs" /> + <Compile Include="src\main\csharp\Threads\TaskRunner.cs" /> + <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" /> <Compile Include="src\main\csharp\TransactionContext.cs"> <SubType>Code</SubType> </Compile> + <Compile Include="src\main\csharp\Transport\Failover\BackupTransport.cs" /> + <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" /> + <Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" /> <Compile Include="src\main\csharp\Transport\FutureResponse.cs"> <SubType>Code</SubType> </Compile> + <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" /> <Compile Include="src\main\csharp\Transport\ITransport.cs"> <SubType>Code</SubType> </Compile> @@ -731,6 +753,7 @@ <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs"> <SubType>Code</SubType> </Compile> + <Compile Include="src\main\csharp\Transport\TransportFactory.cs" /> <Compile Include="src\main\csharp\Transport\TransportFilter.cs"> <SubType>Code</SubType> </Compile>
