Author: rgodfrey
Date: Sat May 11 11:05:32 2013
New Revision: 1481290

URL: http://svn.apache.org/r1481290
Log:
QPID-4829 : [JMS AMQP 1.0] Sessions added to started connections are not 
themselves started

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
 Sat May 11 11:05:32 2013
@@ -163,6 +163,7 @@ public class ConnectionImpl implements C
                 connect();
                 started = true;
             }
+
             try
             {
                 SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -170,6 +171,11 @@ public class ConnectionImpl implements C
                 session.setTopicSession(_isTopicConnection);
                 _sessions.add(session);
 
+                if(_state == State.STARTED)
+                {
+                    session.start();
+                }
+
                 return session;
             }
             catch(JMSException e)
@@ -191,9 +197,17 @@ public class ConnectionImpl implements C
                     throw e;
                 }
             }
-
         }
 
+
+    }
+
+    void removeSession(SessionImpl session)
+    {
+        synchronized (_lock)
+        {
+            _sessions.remove(session);
+        }
     }
 
     private void reconnect(String networkHost, int port, String hostName)
@@ -410,10 +424,7 @@ public class ConnectionImpl implements C
 
     public boolean isStarted()
     {
-        synchronized (_lock)
-        {
-            return _state == State.STARTED;
-        }
+        return _state == State.STARTED;
     }
 
     void setQueueConnection(final boolean queueConnection)

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 Sat May 11 11:05:32 2013
@@ -188,15 +188,16 @@ public class MessageConsumerImpl impleme
     {
         checkClosed();
         _messageListener = messageListener;
-        _session.messageListenerSet( this );
         _receiver.setMessageArrivalListener(new 
Receiver.MessageArrivalListener()
-        {
+                {
+
+                    public void messageArrived(final Receiver receiver)
+                    {
+                        _session.messageArrived(MessageConsumerImpl.this);
+                    }
+                });
+        _session.messageListenerSet( this );
 
-            public void messageArrived(final Receiver receiver)
-            {
-                _session.messageArrived(MessageConsumerImpl.this);
-            }
-        });
     }
 
     public MessageImpl receive() throws JMSException

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 Sat May 11 11:05:32 2013
@@ -230,6 +230,7 @@ public class SessionImpl implements Sess
                 producer.close();
             }
             _session.close();
+            _connection.removeSession(this);
         }
     }
 
@@ -765,7 +766,6 @@ public class SessionImpl implements Sess
         return _txn;
     }
 
-
     private class Dispatcher implements Runnable
     {
 
@@ -816,7 +816,6 @@ public class SessionImpl implements Sess
                             msg = consumer.receive0(0L);
                         }
 
-
                         MessageListener listener = consumer._messageListener;
 
                         MessageImpl message = consumer.createJMSMessage(msg, 
recoveredMessage);

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 Sat May 11 11:05:32 2013
@@ -566,6 +566,11 @@ public class Receiver implements Deliver
         synchronized(_endpoint.getLock())
         {
             _messageArrivalListener = messageArrivalListener;
+            int prefetchSize = _prefetchQueue.size();
+            for(int i = 0; i < prefetchSize; i++)
+            {
+                postPrefetchAction();
+            }
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to