Author: tabish
Date: Thu Aug 20 20:14:01 2009
New Revision: 806337
URL: http://svn.apache.org/viewvc?rev=806337&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-180
Add some more tests and improve the existing ones. Updates the
FailoverTransport to actually send on exceptions to the listener when the
maxReconnectionAttempts condition is met.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Thu Aug 20 20:14:01 2009
@@ -893,7 +893,7 @@
{
Tracer.ErrorFormat("Failed to connect
to transport after {0} attempt(s)", connectFailures);
connectionFailure = failure;
- onException(this, connectionFailure);
+ this.Exception(this, connectionFailure);
return false;
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
Thu Aug 20 20:14:01 2009
@@ -136,7 +136,10 @@
}
// Notify external Client of command that we "sent"
- this.OutgoingCommand(this, command);
+ if( this.OutgoingCommand != null )
+ {
+ this.OutgoingCommand(this, command);
+ }
command.CommandId = Interlocked.Increment(ref this.nextCommandId);
command.ResponseRequired = true;
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransportFactory.cs
Thu Aug 20 20:14:01 2009
@@ -83,6 +83,13 @@
get { return numSentMessagesBeforeFail ; }
set { numSentMessagesBeforeFail = value; }
}
+
+ private bool failOnCreate = false;
+ public bool FailOnCreate
+ {
+ get{ return failOnCreate; }
+ set{ this.failOnCreate = value; }
+ }
#endregion
@@ -99,6 +106,8 @@
public ITransport CompositeConnect(Uri location)
{
+ Tracer.Debug("MockTransportFactory: Create new Transport with
options: " + location.Query);
+
// Extract query parameters from broker Uri
StringDictionary map =
URISupport.ParseQuery(location.Query);
@@ -110,6 +119,11 @@
{
throw new IOException("Unsupported WireFormat Supplied for
MockTransport");
}
+
+ if(this.FailOnCreate == true)
+ {
+ throw new IOException("Failed to Create new MockTransport.");
+ }
// Create the Mock Transport
MockTransport transport = new MockTransport();
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportFactoryTest.cs
Thu Aug 20 20:14:01 2009
@@ -59,6 +59,18 @@
Assert.IsNotNull(transport);
Assert.IsTrue(transport.FailOnSendMessage);
Assert.AreEqual(20, transport.NumSentMessagesBeforeFail);
- }
+ }
+
+ [Test]
+ [ExpectedException( "Apache.NMS.ActiveMQ.IOException" )]
+ public void CreationFailMockTransportTest()
+ {
+ MockTransportFactory factory = new MockTransportFactory();
+
+ Uri location = new
Uri("mock://0.0.0.0:61616?transport.failOnCreate=true");
+
+ factory.CreateTransport(location);
+ }
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportTest.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Mock/MockTransportTest.cs
Thu Aug 20 20:14:01 2009
@@ -39,16 +39,19 @@
public void OnException(ITransport transport, Exception exception)
{
+ Tracer.DebugFormat("MockTransportTest::onException - " + exception
);
exceptions.Add( exception );
}
public void OnCommand(ITransport transport, Command command)
{
+ Tracer.DebugFormat("MockTransportTest::OnCommand - " + command );
received.Add( command );
}
public void OnOutgoingCommand(ITransport transport, Command command)
{
+ Tracer.DebugFormat("MockTransportTest::OnOutgoingCommand - " +
command );
sent.Add( command );
}
@@ -98,6 +101,17 @@
ActiveMQTextMessage message = new ActiveMQTextMessage();
transport.Oneway( message );
Assert.IsTrue(transport.NumSentMessages == 1);
+ Assert.Contains(message, sent);
+ }
+
+ [Test]
+ public void RequestMessageTest()
+ {
+ transport.Start();
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ transport.Request( message );
+ Assert.IsTrue(transport.NumSentMessages == 1);
+ Assert.Contains(message, sent);
}
[Test]
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs?rev=806337&r1=806336&r2=806337&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
Thu Aug 20 20:14:01 2009
@@ -29,41 +29,42 @@
namespace Apache.NMS.ActiveMQ.Test
{
- internal class ConsoleTracer : ITrace
- {
- public bool IsDebugEnabled { get { return true; } }
- public bool IsInfoEnabled { get { return true; } }
- public bool IsWarnEnabled { get { return true; } }
- public bool IsErrorEnabled { get { return true; } }
- public bool IsFatalEnabled { get { return true; } }
- public void Debug(string message) { Console.WriteLine("DEBUG:" +
message); }
- public void Info(string message) { Console.WriteLine("INFO:" +
message); }
- public void Warn(string message) { Console.WriteLine("WARN:" +
message); }
- public void Error(string message) { Console.WriteLine("ERROR:" +
message); }
- public void Fatal(string message) { Console.WriteLine("FATAL:" +
message); }
- }
-
[TestFixture]
public class FailoverTransportTest
{
- private List<Command> received = new List<Command>();
- private List<Exception> exceptions = new List<Exception>();
+ private List<Command> received;
+ private List<Exception> exceptions;
+
+ int sessionIdx = 1;
+ int consumerIdx = 1;
+ int producerIdx = 1;
public void OnException(ITransport transport, Exception exception)
{
+ Tracer.Debug("Test: Received Exception from Transport: " +
exception );
exceptions.Add( exception );
}
public void OnCommand(ITransport transport, Command command)
{
+ Tracer.Debug("Test: Received Command from Transport: " + command );
received.Add( command );
}
-
+
+ [SetUp]
+ public void init()
+ {
+ this.received = new List<Command>();
+ this.exceptions = new List<Exception>();
+ this.sessionIdx = 1;
+ this.consumerIdx = 1;
+ this.producerIdx = 1;
+ }
+
[Test]
public void FailoverTransportCreateTest()
{
Uri uri = new
Uri("failover:(mock://localhost:61616)?randomize=false");
- Tracer.Trace = new ConsoleTracer();
FailoverTransportFactory factory = new FailoverTransportFactory();
@@ -83,7 +84,431 @@
Assert.IsTrue(failover.IsConnected);
transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportWithBackupsTest()
+ {
+ Uri uri = new
Uri("failover:(mock://localhost:61616,mock://localhost:61618)?randomize=false&backup=true");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+ Assert.IsTrue(failover.Backup);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+ Assert.IsTrue(failover.IsConnected);
+
+ transport.Stop();
+ transport.Dispose();
}
+
+ [Test]
+ public void FailoverTransportCreateFailOnCreateTest()
+ {
+ Uri uri = new
Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +
+
"useExponentialBackOff=false&maxReconnectAttempts=3&initialReconnectDelay=100");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsTrue(failover.MaxReconnectAttempts == 3);
+
+ transport.Start();
+
+ Thread.Sleep(2000);
+ Assert.IsNotEmpty(this.exceptions);
+ Assert.IsFalse(failover.IsConnected);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportFailOnSendMessageTest()
+ {
+ Uri uri = new
Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +
+
"useExponentialBackOff=false&maxReconnectAttempts=3&initialReconnectDelay=100");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsTrue(failover.MaxReconnectAttempts == 3);
+
+ transport.Start();
+
+ try{
+ ActiveMQMessage message = new ActiveMQMessage();
+ transport.Oneway(message);
+
+ Assert.Fail("Oneway call should block and then throw.");
+ }
+ catch(Exception)
+ {
+ }
+
+ Assert.IsNotEmpty(this.exceptions);
+ Assert.IsFalse(failover.IsConnected);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportFailingBackupsTest()
+ {
+ Uri uri = new Uri(
+ "failover:(mock://localhost:61616," +
+
"mock://localhost:61618?transport.failOnCreate=true)?randomize=false&backup=true");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsTrue(failover.Backup = true);
+
+ transport.Start();
+
+ Thread.Sleep(2000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportSendOnewayMessageTest()
+ {
+ int numMessages = 1000;
+ Uri uri = new Uri(
+ "failover:(mock://localhost:61616)?randomize=false");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = null;
+ while(mock == null ) {
+ mock = (MockTransport) transport.Narrow(typeof(MockTransport));
+ }
+ mock.OutgoingCommand = new CommandHandler(OnCommand);
+
+ ActiveMQMessage message = new ActiveMQMessage();
+ for(int i = 0; i < numMessages; ++i) {
+ transport.Oneway(message);
+ }
+
+ Thread.Sleep(2000);
+
+ Assert.IsTrue(this.received.Count == numMessages);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportSendRequestTest()
+ {
+ Uri uri = new Uri(
+ "failover:(mock://localhost:61616)?randomize=false");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = null;
+ while(mock == null ) {
+ mock = (MockTransport) transport.Narrow(typeof(MockTransport));
+ }
+ mock.OutgoingCommand = new CommandHandler(OnCommand);
+
+ ActiveMQMessage message = new ActiveMQMessage();
+
+ transport.Request(message);
+ transport.Request(message);
+ transport.Request(message);
+ transport.Request(message);
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(this.received.Count == 4);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportSendOnewayFailTest()
+ {
+ Uri uri = new Uri(
+ "failover:(mock://localhost:61616?failOnSendMessage=true," +
+ "mock://localhost:61618)?randomize=false");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = null;
+ while(mock == null ) {
+ mock = (MockTransport) transport.Narrow(typeof(MockTransport));
+ }
+ mock.OutgoingCommand = new CommandHandler(OnCommand);
+
+ ActiveMQMessage message = new ActiveMQMessage();
+
+ transport.Oneway(message);
+ transport.Oneway(message);
+ transport.Oneway(message);
+ transport.Oneway(message);
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(this.received.Count == 4);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void FailoverTransportSendRequestFailTest()
+ {
+ Uri uri = new Uri(
+ "failover:(mock://localhost:61616?failOnSendMessage=true," +
+ "mock://localhost:61618)?randomize=false");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = null;
+ while(mock == null ) {
+ mock = (MockTransport) transport.Narrow(typeof(MockTransport));
+ }
+ mock.OutgoingCommand = new CommandHandler(OnCommand);
+
+ ActiveMQMessage message = new ActiveMQMessage();
+
+ transport.Request(message);
+ transport.Request(message);
+ transport.Request(message);
+ transport.Request(message);
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(this.received.Count == 4);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ [Test]
+ public void OpenWireCommandsTest() {
+
+ Uri uri = new
Uri("failover:(mock://localhost:61616)?randomize=false");
+
+ FailoverTransportFactory factory = new FailoverTransportFactory();
+
+ ITransport transport = factory.CreateTransport( uri );
+ Assert.IsNotNull( transport );
+ transport.Command = new CommandHandler(OnCommand);
+ transport.Exception = new ExceptionHandler(OnException);
+
+ FailoverTransport failover = (FailoverTransport)
transport.Narrow(typeof(FailoverTransport));
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+
+ transport.Start();
+
+ Thread.Sleep(1000);
+
+ Assert.IsTrue(failover.IsConnected);
+
+ ConnectionInfo connection = createConnection();
+ transport.Request( connection );
+ SessionInfo session1 = createSession( connection );
+ transport.Request( session1 );
+ SessionInfo session2 = createSession( connection );
+ transport.Request( session2 );
+ ConsumerInfo consumer1 = createConsumer( session1 );
+ transport.Request( consumer1 );
+ ConsumerInfo consumer2 = createConsumer( session1 );
+ transport.Request( consumer2 );
+ ConsumerInfo consumer3 = createConsumer( session2 );
+ transport.Request( consumer3 );
+
+ ProducerInfo producer1 = createProducer( session2 );
+ transport.Request( producer1 );
+
+ // Remove the Producers
+ disposeOf( producer1, transport );
+
+ // Remove the Consumers
+ disposeOf( consumer1, transport );
+ disposeOf( consumer2, transport );
+ disposeOf( consumer3, transport );
+
+ // Remove the Session instances.
+ disposeOf( session1, transport );
+ disposeOf( session2, transport );
+
+ // Indicate that we are done.
+ ShutdownInfo shutdown = new ShutdownInfo();
+ transport.Oneway(shutdown);
+
+ transport.Stop();
+ transport.Dispose();
+ }
+
+ protected ConnectionInfo createConnection() {
+
+ ConnectionId id = new ConnectionId();
+ id.Value = Guid.NewGuid().ToString();
+
+ ConnectionInfo info = new ConnectionInfo();
+ info.ClientId = Guid.NewGuid().ToString();
+ info.ConnectionId = id;
+
+ return info;
+ }
+
+ SessionInfo createSession( ConnectionInfo parent ) {
+
+ SessionId id = new SessionId();
+ id.ConnectionId = parent.ConnectionId.Value;
+ id.Value = sessionIdx++;
+
+ SessionInfo info = new SessionInfo();
+ info.SessionId = id;
+
+ return info;
+ }
+
+ ConsumerInfo createConsumer( SessionInfo parent ) {
+
+ ConsumerId id = new ConsumerId();
+ id.ConnectionId = parent.SessionId.ConnectionId;
+ id.SessionId = parent.SessionId.Value;
+ id.Value = consumerIdx++;
+
+ ConsumerInfo info = new ConsumerInfo();
+ info.ConsumerId = id;
+
+ return info;
+ }
+
+ ProducerInfo createProducer( SessionInfo parent ) {
+
+ ProducerId id = new ProducerId();
+ id.ConnectionId = parent.SessionId.ConnectionId;
+ id.SessionId = parent.SessionId.Value;
+ id.Value = producerIdx++;
+
+ ProducerInfo info = new ProducerInfo();
+ info.ProducerId = id;
+
+ return info;
+ }
+
+ void disposeOf( SessionInfo session, ITransport transport ) {
+
+ RemoveInfo command = new RemoveInfo();
+ command.ObjectId = session.SessionId;
+ transport.Oneway( command );
+ }
+
+ void disposeOf( ConsumerInfo consumer, ITransport transport ) {
+
+ RemoveInfo command = new RemoveInfo();
+ command.ObjectId = consumer.ConsumerId;
+ transport.Oneway( command );
+ }
+
+ void disposeOf( ProducerInfo producer, ITransport transport ) {
+
+ RemoveInfo command = new RemoveInfo();
+ command.ObjectId = producer.ProducerId;
+ transport.Oneway( command );
+ }
+
}
}