Author: tabish Date: Wed Dec 18 22:36:11 2013 New Revision: 1552134 URL: http://svn.apache.org/r1552134 Log: https://issues.apache.org/jira/browse/AMQNET-454
Apply patch https://issues.apache.org/jira/secure/attachment/12619423/Apache.NMS.AMQP-qpid-object-lifecycle-02.patch Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs (with props) Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1552134&r1=1552133&r2=1552134&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Wed Dec 18 22:36:11 2013 @@ -50,7 +50,7 @@ namespace Apache.NMS.Amqp private int sessionCounter = 0; private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); - Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start() + private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start() /// <summary> /// Creates new connection @@ -81,7 +81,7 @@ namespace Apache.NMS.Amqp { foreach (Session session in sessions) { - //session.Start(); + session.Start(); } } } @@ -336,7 +336,44 @@ namespace Apache.NMS.Amqp public void Close() { - Dispose(); + if (!this.closed.Value) + { + this.Stop(); + } + + lock (connectedLock) + { + if (this.closed.Value) + { + return; + } + + try + { + Tracer.InfoFormat("Connection[]: Closing Connection Now."); + this.closing.Value = true; + + lock (sessions.SyncRoot) + { + foreach (Session session in sessions) + { + session.Shutdown(); + } + } + sessions.Clear(); + + } + catch (Exception ex) + { + Tracer.ErrorFormat("Connection[]: Error during connection close: {0}", ex); + } + finally + { + this.closed.Value = true; + this.connected.Value = false; + this.closing.Value = false; + } + } } public void PurgeTempDestinations() @@ -359,5 +396,15 @@ namespace Apache.NMS.Amqp { return Interlocked.Increment(ref sessionCounter); } + + public Org.Apache.Qpid.Messaging.Session CreateQpidSession() + { + // TODO: Session name; transactional session + if (!connected.Value) + { + throw new ConnectionClosedException(); + } + return qpidConnection.CreateSession(); + } } } Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Wed Dec 18 22:36:11 2013 @@ -1,6 +1,3 @@ -using System; -using Org.Apache.Qpid.Messaging; -using System.Threading; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,7 +14,11 @@ using System.Threading; * See the License for the specific language governing permissions and * limitations under the License. */ + +using System; +using System.Threading; using Apache.NMS.Util; +using Org.Apache.Qpid.Messaging; namespace Apache.NMS.Amqp { @@ -26,6 +27,11 @@ namespace Apache.NMS.Amqp /// </summary> public class MessageConsumer : IMessageConsumer { + /// <summary> + /// Private object used for synchronization, instead of public "this" + /// </summary> + private readonly object myLock = new object(); + protected TimeSpan zeroTimeout = new TimeSpan(0); private readonly Session session; @@ -38,6 +44,8 @@ namespace Apache.NMS.Amqp private AutoResetEvent pause = new AutoResetEvent(false); private Atomic<bool> asyncDelivery = new Atomic<bool>(false); + private readonly Atomic<bool> started = new Atomic<bool>(false); + private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null; private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer @@ -54,6 +62,28 @@ namespace Apache.NMS.Amqp this.acknowledgementMode = acknowledgementMode; } + public void Start() + { + // Don't try creating session if connection not yet up + if (!session.IsStarted) + { + throw new SessionClosedException(); + } + + if (started.CompareAndSet(false, true)) + { + try + { + // Create qpid sender + qpidReceiver = session.CreateQpidReceiver(""); + } + catch (Org.Apache.Qpid.Messaging.QpidException e) + { + throw new NMSException("Failed to create Qpid Receiver : " + e.Message); + } + } + } + public event MessageListener Listener { add Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Wed Dec 18 22:36:11 2013 @@ -14,7 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + using System; +using System.Threading; +using Apache.NMS.Util; using Org.Apache.Qpid.Messaging; namespace Apache.NMS.Amqp @@ -24,8 +27,13 @@ namespace Apache.NMS.Amqp /// </summary> public class MessageProducer : IMessageProducer { + /// <summary> + /// Private object used for synchronization, instead of public "this" + /// </summary> + private readonly object myLock = new object(); private readonly Session session; + private readonly int id; private Destination destination; //private long messageCounter; @@ -34,10 +42,12 @@ namespace Apache.NMS.Amqp private MsgPriority priority; private bool disableMessageID; private bool disableMessageTimestamp; - private readonly int id; //private IMessageConverter messageConverter; + private readonly Atomic<bool> started = new Atomic<bool>(false); + private Org.Apache.Qpid.Messaging.Sender qpidSender = null; + private ProducerTransformerDelegate producerTransformer; public ProducerTransformerDelegate ProducerTransformer { @@ -52,6 +62,28 @@ namespace Apache.NMS.Amqp this.destination = destination; } + public void Start() + { + // Don't try creating session if connection not yet up + if (!session.IsStarted) + { + throw new SessionClosedException(); + } + + if (started.CompareAndSet(false, true)) + { + try + { + // Create qpid sender + qpidSender = session.CreateQpidSender(""); + } + catch (Org.Apache.Qpid.Messaging.QpidException e) + { + throw new NMSException("Failed to create Qpid Sender : " + e.Message); + } + } + } + public void Send(IMessage message) { Send(Destination, message); Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1552134&r1=1552133&r2=1552134&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Wed Dec 18 22:36:11 2013 @@ -17,6 +17,7 @@ using System; using System.Collections; using System.Threading; +using Apache.NMS.Util; using Org.Apache.Qpid.Messaging; namespace Apache.NMS.Amqp @@ -43,6 +44,8 @@ namespace Apache.NMS.Amqp private int producerCounter; private long nextDeliveryId; private long lastDeliveredSequenceId; + private readonly object sessionLock = new object(); + private readonly Atomic<bool> started = new Atomic<bool>(false); protected bool disposed = false; protected bool closed = false; protected bool closing = false; @@ -50,6 +53,8 @@ namespace Apache.NMS.Amqp private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite); private TimeSpan requestTimeout; + private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start() + public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode) { this.connection = connection; @@ -61,6 +66,58 @@ namespace Apache.NMS.Amqp // TODO: transactions throw new NotSupportedException("Transactions are not supported by Qpid/Amqp"); } + if (connection.IsStarted) + { + this.Start(); + } + connection.AddSession(this); + } + + /// <summary> + /// Create new unmanaged session and start senders and receivers + /// Associated connection must be open. + /// </summary> + public void Start() + { + // Don't try creating session if connection not yet up + if (!connection.IsStarted) + { + throw new ConnectionClosedException(); + } + + if (started.CompareAndSet(false, true)) + { + try + { + // Create qpid session + qpidSession = connection.CreateQpidSession(); + + // Start producers and consumers + lock (producers.SyncRoot) + { + foreach (MessageProducer producer in producers.Values) + { + producer.Start(); + } + } + lock (consumers.SyncRoot) + { + foreach (MessageConsumer consumer in consumers.Values) + { + consumer.Start(); + } + } + } + catch (Org.Apache.Qpid.Messaging.QpidException e) + { + throw new SessionClosedException( "Failed to create session : " + e.Message ); + } + } + } + + public bool IsStarted + { + get { return started.Value; } } public void Dispose() @@ -136,7 +193,7 @@ namespace Apache.NMS.Amqp { foreach (MessageConsumer consumer in consumers.Values) { - consumer.Shutdown(); + consumer.Close(); } } consumers.Clear(); @@ -145,7 +202,7 @@ namespace Apache.NMS.Amqp { foreach (MessageProducer producer in producers.Values) { - producer.Shutdown(); + producer.Close(); } } producers.Clear(); @@ -463,7 +520,26 @@ namespace Apache.NMS.Amqp { get { return id; } } - + + + public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(string address) + { + if (!IsStarted) + { + throw new SessionClosedException(); + } + return qpidSession.CreateReceiver(address); + } + + public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(string address) + { + if (!IsStarted) + { + throw new SessionClosedException(); + } + return qpidSession.CreateSender(address); + } + #region Transaction State Events public event SessionTxEventDelegate TransactionStartedListener; Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs?rev=1552134&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs Wed Dec 18 22:36:11 2013 @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace Apache.NMS.Amqp +{ + /// <summary> + /// Exception thrown when a session is used that it already closed + /// </summary> + [Serializable] + public class SessionClosedException : NMSException + { + public SessionClosedException() + : base("The session is already closed!") + { + } + + public SessionClosedException(string message) + : base(message) + { + } + + public SessionClosedException(string message, string errorCode) + : base(message, errorCode) + { + } + + public SessionClosedException(string message, Exception innerException) + : base(message, innerException) + { + } + + public SessionClosedException(string message, string errorCode, Exception innerException) + : base(message, errorCode, innerException) + { + } + + #region ISerializable interface implementation + + /// <summary> + /// Initializes a new instance of the SessionClosedException class with serialized data. + /// Throws System.ArgumentNullException if the info parameter is null. + /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0). + /// </summary> + /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param> + /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param> + protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) + : base(info, context) + { + } + + #endregion + } +} \ No newline at end of file Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs ------------------------------------------------------------------------------ svn:eol-style = native
