Author: tabish
Date: Thu Oct 28 21:28:51 2010
New Revision: 1028493
URL: http://svn.apache.org/viewvc?rev=1028493&view=rev
Log:
Refactor the Creation methods to make extending these types simpler.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1028493&r1=1028492&r2=1028493&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Thu Oct 28 21:28:51 2010
@@ -17,7 +17,6 @@
using System;
using System.Collections;
-using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
@@ -52,6 +51,7 @@ namespace Apache.NMS.ActiveMQ
private readonly ConnectionInfo info;
private TimeSpan requestTimeout; // from connection factory
private BrokerInfo brokerInfo; // from broker
+ private readonly CountDownLatch brokerInfoReceived = new
CountDownLatch(1);
private WireFormatInfo brokerWireFormatInfo; // from broker
private readonly IList sessions = ArrayList.Synchronized(new
ArrayList());
private readonly IDictionary producers = Hashtable.Synchronized(new
Hashtable());
@@ -347,6 +347,15 @@ namespace Apache.NMS.ActiveMQ
get { return brokerWireFormatInfo; }
}
+ public String ResourceManagerId
+ {
+ get
+ {
+ this.brokerInfoReceived.await();
+ return brokerInfo.BrokerId.Value;
+ }
+ }
+
/// <summary>
/// Get/or set the redelivery policy for this connection.
/// </summary>
@@ -427,7 +436,7 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public ISession CreateSession()
{
- return CreateSession(acknowledgementMode);
+ return CreateAtiveMQSession(acknowledgementMode);
}
/// <summary>
@@ -435,29 +444,21 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public ISession CreateSession(AcknowledgementMode
sessionAcknowledgementMode)
{
- SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
- SyncRequest(info, this.RequestTimeout);
- Session session = new Session(this, info,
sessionAcknowledgementMode, this.dispatchAsync);
-
- // Set propertieDs on session using parameters prefixed with
"session."
- if(!String.IsNullOrEmpty(brokerUri.Query) &&
!brokerUri.OriginalString.EndsWith(")"))
- {
- string query =
brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1);
- StringDictionary options =
URISupport.ParseQuery(query);
- options = URISupport.GetProperties(options,
"session.");
- URISupport.SetProperties(session, options);
- }
-
- session.ConsumerTransformer = this.ConsumerTransformer;
- session.ProducerTransformer = this.ProducerTransformer;
+ return CreateAtiveMQSession(sessionAcknowledgementMode);
+ }
- if(IsStarted)
+ protected virtual Session CreateAtiveMQSession(AcknowledgementMode
ackMode)
+ {
+ CheckConnected();
+ return new Session(this, NextSessionId, ackMode);
+ }
+
+ internal void AddSession(Session session)
+ {
+ if(!this.closing)
{
- session.Start();
+ sessions.Add(session);
}
-
- sessions.Add(session);
- return session;
}
internal void RemoveSession(Session session)
@@ -700,6 +701,7 @@ namespace Apache.NMS.ActiveMQ
else if(command is BrokerInfo)
{
this.brokerInfo = (BrokerInfo) command;
+ this.brokerInfoReceived.countDown();
}
else if(command is ShutdownInfo)
{
@@ -803,6 +805,8 @@ namespace Apache.NMS.ActiveMQ
protected void OnException(ITransport sender, Exception exception)
{
+ this.brokerInfoReceived.countDown();
+
if(ExceptionListener != null && !this.closing)
{
try
@@ -893,14 +897,9 @@ namespace Apache.NMS.ActiveMQ
return id;
}
- private SessionInfo CreateSessionInfo(AcknowledgementMode
sessionAcknowledgementMode)
+ protected SessionId NextSessionId
{
- SessionInfo answer = new SessionInfo();
- SessionId sessionId = new SessionId();
- sessionId.ConnectionId = info.ConnectionId.Value;
- sessionId.Value = Interlocked.Increment(ref sessionCounter);
- answer.SessionId = sessionId;
- return answer;
+ get { return new SessionId(this.info.ConnectionId,
Interlocked.Increment(ref this.sessionCounter)); }
}
public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=1028493&r1=1028492&r2=1028493&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Thu Oct 28 21:28:51 2010
@@ -98,61 +98,76 @@ namespace Apache.NMS.ActiveMQ
public IConnection CreateConnection()
{
- return CreateConnection(connectionUserName,
connectionPassword);
+ return CreateActiveMQConnection();
}
public IConnection CreateConnection(string userName, string
password)
{
- Connection connection = null;
-
- try
- {
- Tracer.InfoFormat("Connecting to: {0}",
brokerUri.ToString());
-
- ITransport transport =
TransportFactory.CreateTransport(brokerUri);
-
- connection = new Connection(brokerUri,
transport, this.ClientIdGenerator);
-
- ConfigureConnection(connection);
-
- connection.UserName = userName;
- connection.Password = password;
-
- if(this.clientId != null)
- {
- connection.DefaultClientId =
this.clientId;
- }
-
- connection.ITransport.Start();
-
- return connection;
- }
- catch(NMSException e)
- {
- try
- {
- connection.Close();
- }
- catch
- {
- }
-
- throw e;
- }
- catch(Exception e)
- {
- try
- {
- connection.Close();
- }
- catch
- {
- }
-
- throw NMSExceptionSupport.Create("Could not
connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
- }
+ return CreateActiveMQConnection(userName, password);
}
+ protected virtual Connection CreateActiveMQConnection()
+ {
+ return CreateActiveMQConnection(connectionUserName,
connectionPassword);
+ }
+
+ protected virtual Connection CreateActiveMQConnection(string userName,
string password)
+ {
+ Connection connection = null;
+
+ try
+ {
+ Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
+
+ ITransport transport =
TransportFactory.CreateTransport(brokerUri);
+
+ connection = CreateActiveMQConnection(transport);
+
+ ConfigureConnection(connection);
+
+ connection.UserName = userName;
+ connection.Password = password;
+
+ if(this.clientId != null)
+ {
+ connection.DefaultClientId = this.clientId;
+ }
+
+ connection.ITransport.Start();
+
+ return connection;
+ }
+ catch(NMSException e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw e;
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw NMSExceptionSupport.Create("Could not connect to broker
URL: " + this.brokerUri + ". Reason: " + e.Message, e);
+ }
+ }
+
+ protected virtual Connection CreateActiveMQConnection(ITransport
transport)
+ {
+ return new Connection(this.brokerUri, transport,
this.ClientIdGenerator);
+ }
+
#region ConnectionFactory Properties
/// <summary>
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1028493&r1=1028492&r2=1028493&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Thu Oct 28 21:28:51 2010
@@ -17,7 +17,9 @@
using System;
using System.Collections;
+using System.Collections.Specialized;
using System.Threading;
+using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util;
@@ -59,20 +61,44 @@ namespace Apache.NMS.ActiveMQ
private TimeSpan requestTimeout;
private readonly AcknowledgementMode acknowledgementMode;
- public Session(Connection connection, SessionInfo info,
AcknowledgementMode acknowledgementMode, bool dispatchAsync)
+ public Session(Connection connection, SessionId sessionId,
AcknowledgementMode acknowledgementMode)
{
+ this.info = new SessionInfo();
+ this.info.SessionId = sessionId;
this.connection = connection;
- this.info = info;
+ this.connection.Oneway(this.info);
+
this.acknowledgementMode = acknowledgementMode;
this.requestTimeout = connection.RequestTimeout;
- this.dispatchAsync = dispatchAsync;
+ this.dispatchAsync = connection.DispatchAsync;
if(acknowledgementMode ==
AcknowledgementMode.Transactional)
{
this.transactionContext = new
TransactionContext(this);
}
+ Uri brokerUri = connection.BrokerUri;
+
+ // Set propertieDs on session using parameters prefixed with
"session."
+ if(!String.IsNullOrEmpty(brokerUri.Query) &&
!brokerUri.OriginalString.EndsWith(")"))
+ {
+ string query =
brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1);
+ StringDictionary options = URISupport.ParseQuery(query);
+ options = URISupport.GetProperties(options, "session.");
+ URISupport.SetProperties(this, options);
+ }
+
+ this.ConsumerTransformer = connection.ConsumerTransformer;
+ this.ProducerTransformer = connection.ProducerTransformer;
+
this.executor = new SessionExecutor(this,
this.consumers);
+
+ if(connection.IsStarted)
+ {
+ this.Start();
+ }
+
+ connection.AddSession(this);
}
~Session()