Author: tabish
Date: Wed Aug 19 20:25:00 2009
New Revision: 805963
URL: http://svn.apache.org/viewvc?rev=805963&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-180
Updates to the transport layer to improve the ability to test the code, also
some minor cleanup.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
(with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Wed Aug 19 20:25:00 2009
@@ -257,8 +257,11 @@
{
reconnectMutex.ReleaseMutex();
}
-
- this.Interrupted( transport );
+
+ if( this.Interrupted != null )
+ {
+ this.Interrupted( transport );
+ }
}
}
@@ -736,19 +739,32 @@
return ConnectedTransportURI == null ? "unconnected" :
ConnectedTransportURI.ToString();
}
- public String RemoteAddress
+ public Uri RemoteAddress
{
get
{
- FailoverTransport transport =
ConnectedTransport as FailoverTransport;
- if(transport != null)
+ if(ConnectedTransport != null)
{
- return transport.RemoteAddress;
+ return ConnectedTransport.RemoteAddress;
}
return null;
}
}
+ public Object Narrow(Type type)
+ {
+ if(this.GetType().Equals(type))
+ {
+ return this;
+ }
+ else if(ConnectedTransport != null)
+ {
+ return ConnectedTransport.Narrow(type);
+ }
+
+ return null;
+ }
+
public bool IsFaultTolerant
{
get { return true; }
@@ -801,8 +817,11 @@
ConnectedTransport = t;
connectFailures = 0;
connected = true;
- this.Resumed( t );
-
Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
+ if( this.Resumed != null )
+ {
+ this.Resumed( t );
+ }
+ Tracer.InfoFormat("Successfully
reconnected to backup {0}", uri.ToString());
return
false;
}
catch(Exception
e)
@@ -837,7 +856,11 @@
restoreTransport(t);
}
- this.Resumed( t );
+ if( this.Resumed != null )
+ {
+ this.Resumed( t );
+ }
+
Tracer.Debug("Connection established");
ReconnectDelay
= InitialReconnectDelay;
ConnectedTransportURI = uri;
@@ -881,7 +904,6 @@
if(!disposed)
{
-
Tracer.DebugFormat("Waiting {0}ms before
attempting connection.", ReconnectDelay);
try
{
@@ -912,7 +934,6 @@
return !disposed;
}
-
private bool buildBackups()
{
try
@@ -973,7 +994,7 @@
get { return disposed; }
}
- public bool Connected
+ public bool IsConnected
{
get { return connected; }
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
Wed Aug 19 20:25:00 2009
@@ -26,23 +26,87 @@
public delegate void ResumedHandler(ITransport sender);
/// <summary>
- /// Represents the logical networking transport layer.
+ /// Represents the logical networking transport layer. Transports
implment the low
+ /// level protocol specific portion of the Communication between the
Client and a Broker
+ /// such as TCP, UDP, etc. Transports make use of WireFormat objects to
handle translateing
+ /// the cononical OpenWire Commands used in this client into binary wire
level packets that
+ /// can be sent to the Broker or Service that the Transport connects to.
/// </summary>
public interface ITransport : IStartable, IDisposable, IStoppable
{
+ /// <summary>
+ /// Sends a Command object on the Wire but does not wait for any
response from the
+ /// receiver before returning.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
void Oneway(Command command);
+ /// <summary>
+ /// Sends a Command object which requires a response from the Broker
but does not
+ /// wait for the response, instead a FutureResponse object is returned
that the
+ /// caller can use to wait on the Broker's response.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="FutureResponse"/>
+ /// </returns>
FutureResponse AsyncRequest(Command command);
- TimeSpan RequestTimeout
- {
- get;
- set;
- }
-
+ /// <summary>
+ /// Sends a Command to the Broker and waits for a Response to that
Command before
+ /// returning, this version waits indefinitely for a response.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="Response"/>
+ /// </returns>
Response Request(Command command);
+
+ /// <summary>
+ /// Sends a Command to the Broker and waits for the given TimeSpan to
expire for a
+ /// response before returning.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="Response"/>
+ /// </returns>
Response Request(Command command, TimeSpan timeout);
+ /// <summary>
+ /// Allows a caller to find a specific type of Transport in the Chain
of
+ /// Transports that is created. This allows a caller to find a
specific
+ /// object in the Transport chain and set or get properties on that
specific
+ /// instance. If the requested type isn't in the chain than Null is
returned.
+ /// </summary>
+ /// <param name="type">
+ /// A <see cref="Type"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="System.Object"/>
+ /// </returns>
+ Object Narrow(Type type);
+
+ /// <value>
+ /// The time that the Transport waits before considering a request to
have
+ /// failed and throwing an exception.
+ /// </value>
+ TimeSpan RequestTimeout
+ {
+ get;
+ set;
+ }
+
CommandHandler Command
{
get;
@@ -66,11 +130,42 @@
get;
set;
}
-
+
+ /// <value>
+ /// Indicates if this Transport has already been disposed and can no
longer
+ /// be used.
+ /// </value>
bool IsDisposed
{
get;
}
+
+ /// <value>
+ /// Indicates if this Transport is Fault Tolerant or not. A fault
Tolerant
+ /// Transport handles low level connection errors internally allowing
a client
+ /// to remain unaware of wire level disconnection and reconnection
details.
+ /// </value>
+ bool IsFaultTolerant
+ {
+ get;
+ }
+
+ /// <value>
+ /// Indiciates if the Transport is current Connected to is assigned
URI.
+ /// </value>
+ bool IsConnected
+ {
+ get;
+ }
+
+ /// <value>
+ /// The Remote Address that this transport is currently connected to.
+ /// </value>
+ Uri RemoteAddress
+ {
+ get;
+ }
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
Wed Aug 19 20:25:00 2009
@@ -91,11 +91,13 @@
if( this.parent.FailOnReceiveMessage &&
this.parent.NumReceivedMessages >
this.parent.NumReceivedMessagesBeforeFail ) {
+ Tracer.Debug("MockTransport Async Task: Performing
configured receive failure." );
this.parent.Exception(this.parent, new IOException(
"Failed to Receive Message."));
}
}
// Send all the responses.
+ Tracer.Debug("MockTransport Async Task: Simulate receive of
Command: " + command.ToString() );
this.parent.Command(this.parent, command);
return parent.receiveQueue.Count != 0;
@@ -123,6 +125,8 @@
public Response Request(Command command, TimeSpan timeout)
{
+ Tracer.Debug("MockTransport sending Request Command: " +
command.ToString() );
+
if( command.IsMessage ) {
this.numSentMessages++;
@@ -142,10 +146,13 @@
public void Oneway(Command command)
{
+ Tracer.Debug("MockTransport sending oneway Command: " +
command.ToString() );
+
if( command.IsMessage ) {
this.numSentMessages++;
if( this.failOnSendMessage && this.numSentMessages >
this.numSentMessagesBeforeFail ) {
+ Tracer.Debug("MockTransport Oneway send, failing as per
configuration." );
throw new IOException( "Failed to Send Message.");
}
}
@@ -166,7 +173,10 @@
this.asyncResponseTask.wakeup();
// Send the Command to the Outgoing Command Snoop Hook.
- this.OutgoingCommand(this, command);
+ if( this.OutgoingCommand != null ) {
+ Tracer.Debug("MockTransport Oneway, Notifying Outgoing
linstener." );
+ this.OutgoingCommand(this, command);
+ }
}
public FutureResponse AsyncRequest(Command command)
@@ -229,6 +239,16 @@
this.asyncResponseTask.wakeup();
}
+
+ public Object Narrow(Type type)
+ {
+ if( this.GetType().Equals(type) )
+ {
+ return this;
+ }
+
+ return null;
+ }
#region Property Accessors
@@ -313,7 +333,22 @@
get { return numReceivedMessages; }
set { numReceivedMessages = value; }
}
-
+
+ public bool IsFaultTolerant
+ {
+ get{ return false; }
+ }
+
+ public bool IsConnected
+ {
+ get{ return true; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get{ return null; }
+ }
+
#endregion
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Wed Aug 19 20:25:00 2009
@@ -41,6 +41,7 @@
private AtomicBoolean closed = new AtomicBoolean(false);
private volatile bool seenShutdown;
private TimeSpan maxWait =
TimeSpan.FromMilliseconds(Timeout.Infinite);
+ private Uri connectedUri;
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
@@ -48,8 +49,9 @@
private ResumedHandler resumedHandler;
private TimeSpan MAX_THREAD_WAIT =
TimeSpan.FromMilliseconds(30000);
- public TcpTransport(Socket socket, IWireFormat wireformat)
+ public TcpTransport(Uri uri, Socket socket, IWireFormat
wireformat)
{
+ this.connectedUri = uri;
this.socket = socket;
this.wireformat = wireformat;
}
@@ -377,6 +379,32 @@
get { return wireformat; }
set { wireformat = value; }
}
+
+ public bool IsFaultTolerant
+ {
+ get{ return false; }
+ }
+
+ public bool IsConnected
+ {
+ get{ return socket.Connected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get{ return connectedUri; }
+ }
+
+ public Object Narrow(Type type)
+ {
+ if( this.GetType().Equals(type) )
+ {
+ return this;
+ }
+
+ return null;
+ }
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
Wed Aug 19 20:25:00 2009
@@ -121,7 +121,7 @@
#endif
IWireFormat wireformat = CreateWireFormat(map);
- ITransport transport = new TcpTransport(socket,
wireformat);
+ ITransport transport = new TcpTransport(location,
socket, wireformat);
wireformat.Transport = transport;
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Wed Aug 19 20:25:00 2009
@@ -182,6 +182,32 @@
public virtual void Stop()
{
}
- }
+
+ public Object Narrow(Type type)
+ {
+ if( this.GetType().Equals( type ) ) {
+ return this;
+ } else if( this.next != null ) {
+ return this.next.Narrow( type );
+ }
+
+ return null;
+ }
+
+ public bool IsFaultTolerant
+ {
+ get{ return next.IsFaultTolerant; }
+ }
+
+ public bool IsConnected
+ {
+ get{ return next.IsConnected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get{ return next.RemoteAddress; }
+ }
+ }
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs?rev=805963&r1=805962&r2=805963&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
Wed Aug 19 20:25:00 2009
@@ -38,6 +38,13 @@
ITransport transport = factory.CreateTransport(location);
Assert.IsNotNull(transport);
+
+ Assert.IsInstanceOfType(typeof(MockTransport),
transport.Narrow(typeof(MockTransport)));
+
+ MockTransport mock = (MockTransport)
transport.Narrow(typeof(MockTransport));
+
+ Assert.IsTrue( mock.IsConnected );
+ Assert.IsFalse( mock.IsFaultTolerant );
}
[Test]
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs?rev=805963&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
Wed Aug 19 20:25:00 2009
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Mock;
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ internal class ConsoleTracer : ITrace
+ {
+ public bool IsDebugEnabled { get { return true; } }
+ public bool IsInfoEnabled { get { return true; } }
+ public bool IsWarnEnabled { get { return true; } }
+ public bool IsErrorEnabled { get { return true; } }
+ public bool IsFatalEnabled { get { return true; } }
+ public void Debug(string message) { Console.WriteLine("DEBUG:" +
message); }
+ public void Info(string message) { Console.WriteLine("INFO:" +
message); }
+ public void Warn(string message) { Console.WriteLine("WARN:" +
message); }
+ public void Error(string message) { Console.WriteLine("ERROR:" +
message); }
+ public void Fatal(string message) { Console.WriteLine("FATAL:" +
message); }
+ }
+
+ [TestFixture]
+ public class FailoverTransportTest
+ {
+ private List<Command> received = new List<Command>();
+ private List<Exception> exceptions = new List<Exception>();
+
+ public void OnException(ITransport transport, Exception exception)
+ {
+ exceptions.Add( exception );
+ }
+
+ public void OnCommand(ITransport transport, Command command)
+ {
+ received.Add( command );
+ }
+
+ [Test]
+ public void FailoverTransportCreateTest()
+ {
+ Uri uri = new
Uri("failover:(mock://localhost:61616)?randomize=false");
+ Tracer.Trace = new ConsoleTracer();
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport(uri);
+ Assert.IsNotNull(transport);
+
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+ Assert.IsTrue(failover.IsConnected);
+
+ transport.Stop();
+
+ }
+ }
+}
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
------------------------------------------------------------------------------
svn:eol-style = native