Author: rgodfrey
Date: Sat May 11 11:05:32 2013
New Revision: 1481290
URL: http://svn.apache.org/r1481290
Log:
QPID-4829 : [JMS AMQP 1.0] Sessions added to started connections are not
themselves started
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
Sat May 11 11:05:32 2013
@@ -163,6 +163,7 @@ public class ConnectionImpl implements C
connect();
started = true;
}
+
try
{
SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -170,6 +171,11 @@ public class ConnectionImpl implements C
session.setTopicSession(_isTopicConnection);
_sessions.add(session);
+ if(_state == State.STARTED)
+ {
+ session.start();
+ }
+
return session;
}
catch(JMSException e)
@@ -191,9 +197,17 @@ public class ConnectionImpl implements C
throw e;
}
}
-
}
+
+ }
+
+ void removeSession(SessionImpl session)
+ {
+ synchronized (_lock)
+ {
+ _sessions.remove(session);
+ }
}
private void reconnect(String networkHost, int port, String hostName)
@@ -410,10 +424,7 @@ public class ConnectionImpl implements C
public boolean isStarted()
{
- synchronized (_lock)
- {
- return _state == State.STARTED;
- }
+ return _state == State.STARTED;
}
void setQueueConnection(final boolean queueConnection)
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
Sat May 11 11:05:32 2013
@@ -188,15 +188,16 @@ public class MessageConsumerImpl impleme
{
checkClosed();
_messageListener = messageListener;
- _session.messageListenerSet( this );
_receiver.setMessageArrivalListener(new
Receiver.MessageArrivalListener()
- {
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ _session.messageListenerSet( this );
- public void messageArrived(final Receiver receiver)
- {
- _session.messageArrived(MessageConsumerImpl.this);
- }
- });
}
public MessageImpl receive() throws JMSException
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Sat May 11 11:05:32 2013
@@ -230,6 +230,7 @@ public class SessionImpl implements Sess
producer.close();
}
_session.close();
+ _connection.removeSession(this);
}
}
@@ -765,7 +766,6 @@ public class SessionImpl implements Sess
return _txn;
}
-
private class Dispatcher implements Runnable
{
@@ -816,7 +816,6 @@ public class SessionImpl implements Sess
msg = consumer.receive0(0L);
}
-
MessageListener listener = consumer._messageListener;
MessageImpl message = consumer.createJMSMessage(msg,
recoveredMessage);
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1481290&r1=1481289&r2=1481290&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
Sat May 11 11:05:32 2013
@@ -566,6 +566,11 @@ public class Receiver implements Deliver
synchronized(_endpoint.getLock())
{
_messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]