Author: tabish
Date: Tue Jul 27 15:58:52 2010
New Revision: 979759

URL: http://svn.apache.org/viewvc?rev=979759&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-266

* ITransport.cs:
* TransportFilter.cs:
* Tcp/TcpTransport.cs:
* Mock/MockTransport.cs:
* ICompositeTransport.cs:
* Failover/BackupTransport.cs:
* Failover/FailoverTransport.cs:
* Failover/FailoverTransportFactory.cs: 

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
 Tue Jul 27 15:58:52 2010
@@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
             this.disposed = true;
             if(failoverTransport != null)
             {
-                this.failoverTransport.Reconnect();
+                this.failoverTransport.Reconnect(false);
             }
         }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 Tue Jul 27 15:58:52 2010
@@ -38,6 +38,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private bool disposed;
         private bool connected;
         private List<Uri> uris = new List<Uri>();
+               private List<Uri> updated = new List<Uri>();
         private CommandHandler commandHandler;
         private ExceptionHandler exceptionHandler;
         private InterruptedHandler interruptedHandler;
@@ -76,7 +77,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
         private int maxCacheSize = 256;
         private volatile Exception failure;
         private readonly object mutex = new object();
-
+               private bool reconnectSupported = true;
+               private bool updateURIsSupported = true;
+               
         public FailoverTransport()
         {
             id = idCounter++;
@@ -303,7 +306,17 @@ namespace Apache.NMS.ActiveMQ.Transport.
         {
             get { return started; }
         }
-                   
+
+           public bool IsReconnectSupported
+               {
+                       get{ return this.reconnectSupported; }
+               }
+
+           public bool IsUpdateURIsSupported
+               {
+                       get{ return this.updateURIsSupported; }
+               }
+               
         /// <summary>
         /// </summary>
         /// <param name="command"></param>
@@ -399,7 +412,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
                 else
                 {
-                    Reconnect();
+                    Reconnect(false);
                 }
             }
         }
@@ -499,22 +512,13 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
                 if(!initialized)
                 {
-                    if(command.IsBrokerInfo)
-                    {
-                        BrokerInfo info = (BrokerInfo) command;
-                        BrokerInfo[] peers = info.PeerBrokerInfos;
-                        if(peers != null)
-                        {
-                            for(int i = 0; i < peers.Length; i++)
-                            {
-                                String brokerString = peers[i].BrokerURL;
-                                Add(brokerString);
-                            }
-                        }
-
-                        initialized = true;
-                    }
+                    initialized = true;
                 }
+                               
+                               if(command.IsConnectionControl)
+                               {
+                                       this.HandleConnectionControl(command as 
ConnectionControl);
+                               }
             }
 
             this.Command(sender, command);
@@ -669,7 +673,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
         }
 
-        public void Add(Uri[] u)
+        public void Add(bool rebalance, Uri[] u)
         {
             lock(uris)
             {
@@ -682,10 +686,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
             }
 
-            Reconnect();
+            Reconnect(rebalance);
         }
 
-        public void Remove(Uri[] u)
+        public void Remove(bool rebalance, Uri[] u)
         {
             lock(uris)
             {
@@ -695,10 +699,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
                 }
             }
 
-            Reconnect();
+            Reconnect(rebalance);
         }
 
-        public void Add(String u)
+        public void Add(bool rebalance, String u)
         {
             try
             {
@@ -708,10 +712,9 @@ namespace Apache.NMS.ActiveMQ.Transport.
                     if(!uris.Contains(uri))
                     {
                         uris.Add(uri);
+                                               Reconnect(rebalance);
                     }
-                }
-
-                Reconnect();
+                }                              
             }
             catch(Exception e)
             {
@@ -721,10 +724,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
         public void Reconnect(Uri uri)
         {
-            Add(new Uri[] { uri });
+            Add(true, new Uri[] { uri });
         }
         
-        public void Reconnect()
+        public void Reconnect(bool rebalance)
         {
             lock(reconnectMutex)
             {
@@ -1161,7 +1164,118 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
         }
 
-        public void Dispose()
+               
+               public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+               {               
+               if(IsUpdateURIsSupported)
+                       {
+                   List<Uri> copy = new List<Uri>(this.updated);
+                   List<Uri> added = new List<Uri>();
+                               
+                   if(updatedURIs != null && updatedURIs.Length > 0) 
+                               {
+                       HashSet<Uri> uriSet = new HashSet<Uri>();
+                       for(int i = 0; i < updatedURIs.Length; i++) 
+                                       {
+                           Uri uri = updatedURIs[i];
+                           if(uri != null) 
+                                               {
+                               uriSet.Add(uri);
+                           }
+                       }
+                                       
+                       foreach(Uri uri in uriSet)
+                                       {
+                           if(copy.Remove(uri) == false) 
+                                               {
+                               uriSet.Add(uri);
+                           }
+                       }
+                                       
+                       lock(reconnectMutex) 
+                                       {
+                           this.updated.Clear();
+                           this.updated.AddRange(added);
+                           
+                                               foreach(Uri uri in copy) 
+                                               {
+                               this.uris.Remove(uri);
+                           }
+                           
+                                               this.Add(rebalance, 
added.ToArray());
+                       }
+                   }
+               }                       
+               }
+
+           public void HandleConnectionControl(ConnectionControl control) 
+               {
+               string reconnectStr = control.ReconnectTo;
+               
+                       if(reconnectStr != null) 
+                       {
+                   reconnectStr = reconnectStr.Trim();
+                   if(reconnectStr.Length > 0) 
+                               {
+                       try 
+                                       {
+                           Uri uri = new Uri(reconnectStr);
+                           if(IsReconnectSupported) 
+                                               {
+                               Reconnect(uri);
+                               Tracer.Info("Reconnected to: " + 
uri.OriginalString);
+                           }
+                       } 
+                                       catch(Exception e) 
+                                       {
+                           Tracer.ErrorFormat("Failed to handle 
ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+                       }
+                   }
+               }
+                       
+               ProcessNewTransports(control.RebalanceConnection, 
control.ConnectedBrokers);
+           }
+       
+           private void ProcessNewTransports(bool rebalance, String 
newTransports) 
+               {
+               if(newTransports != null) 
+                       {
+                   newTransports = newTransports.Trim();
+                               
+                   if(newTransports.Length > 0 && IsUpdateURIsSupported) 
+                               {
+                       List<Uri> list = new List<Uri>();
+                       String[] tokens = newTransports.Split(new Char []{','});
+                       
+                                       foreach(String str in tokens)
+                                       {
+                           try 
+                                               {
+                               Uri uri = new Uri(str);
+                               list.Add(uri);
+                           } 
+                                               catch 
+                                               {
+                               Tracer.Error("Failed to parse broker address: " 
+ str);
+                           }
+                       }
+                                       
+                       if(list.Count != 0)
+                                       {
+                           try 
+                                               {
+                               UpdateURIs(rebalance, list.ToArray());
+                           } 
+                                               catch
+                                               {
+                               Tracer.Error("Failed to update transport URI's 
from: " + newTransports);
+                           }
+                       }
+                   }
+               }
+           }
+               
+               public void Dispose()
         {
             Dispose(true);
             GC.SuppressFinalize(this);

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
 Tue Jul 27 15:58:52 2010
@@ -54,7 +54,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                {
                        StringDictionary options = compositData.Parameters;
                        FailoverTransport transport = CreateTransport(options);
-                       transport.Add(compositData.Components);
+                       transport.Add(false, compositData.Components);
                        return transport;
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
 Tue Jul 27 15:58:52 2010
@@ -21,8 +21,29 @@ namespace Apache.NMS.ActiveMQ.Transport
 {
        public interface ICompositeTransport : ITransport
        {
-               void Add(Uri[] uris);
-               void Remove(Uri[] uris);
+               /// <summary>
+               /// Adds a new set of Uris to the list of Uris that this 
Transport can connect to. 
+               /// </summary>
+               /// <param name="rebalance">
+               /// A <see cref="System.Boolean"/>
+               /// Should the current connection be broken and a new one 
created.
+               /// </param>
+               /// <param name="uris">
+               /// A <see cref="Uri[]"/>
+               /// </param>
+               void Add(bool rebalance, Uri[] uris);
+
+               /// <summary>
+               /// Remove the given Uris from this Transports list of known 
Uris. 
+               /// </summary>
+               /// <param name="rebalance">
+               /// A <see cref="System.Boolean"/>
+               /// Should the current connection be broken and a new one 
created.
+               /// </param>
+               /// <param name="uris">
+               /// A <see cref="Uri[]"/>
+               /// </param>
+               void Remove(bool rebalance, Uri[] uris);
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 Tue Jul 27 15:58:52 2010
@@ -156,6 +156,36 @@ namespace Apache.NMS.ActiveMQ.Transport
             get;
         }
         
+               /// <summary>
+               /// Returns true if this Transport supports reconnections.
+               /// </summary>
+           bool IsReconnectSupported
+               {
+                       get;
+               }
+           
+               /// <summary>
+               /// Returns true if this Transport can accept updated lists of 
connection Uri's.
+               /// </summary>
+           bool IsUpdateURIsSupported
+               {
+                       get;
+               }
+               
+               /// <summary>
+               /// Updates the Uri's that this Transport is aware of and will 
use to
+               /// connect itself to.  If the rebalance option is true this 
method will
+               /// terminate any current connection and reconnect to another 
available
+               /// Uri.
+               /// </summary>
+               /// <param name="rebalance">
+               /// A <see cref="System.Boolean"/>
+               /// </param>
+               /// <param name="updatedURIs">
+               /// A <see cref="Uri[]"/>
+               /// </param>
+               void UpdateURIs(bool rebalance, Uri[] updatedURIs);
+               
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
 Tue Jul 27 15:58:52 2010
@@ -372,6 +372,21 @@ namespace Apache.NMS.ActiveMQ.Transport.
             get{ return new Uri("mock://mock"); }
         }
         
+           public bool IsReconnectSupported
+               {
+                       get{ return false; }
+               }
+           
+           public bool IsUpdateURIsSupported
+               {
+                       get{ return false; }
+               }
+               
+               public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+               {
+                       throw new IOException();
+               }
+               
         #endregion
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 Tue Jul 27 15:58:52 2010
@@ -365,6 +365,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
                        return null;
                }
 
+           public bool IsReconnectSupported
+               {
+                       get{ return false; }
+               }
+           
+           public bool IsUpdateURIsSupported
+               {
+                       get{ return false; }
+               }
+               
+               public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+               {
+                       throw new IOException();
+               }               
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=979759&r1=979758&r2=979759&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 Tue Jul 27 15:58:52 2010
@@ -217,6 +217,22 @@ namespace Apache.NMS.ActiveMQ.Transport
         {
             get{ return next.RemoteAddress; }
         }
+               
+           public bool IsReconnectSupported
+               {
+                       get{ return next.IsReconnectSupported; }
+               }
+           
+           public bool IsUpdateURIsSupported
+               {
+                       get{ return next.IsUpdateURIsSupported; }
+               }
+               
+               public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+               {
+                       next.UpdateURIs(rebalance, updatedURIs);
+               }
+               
     }
 }
 


Reply via email to