Author: rajith
Date: Tue Jul 8 23:18:46 2014
New Revision: 1608971
URL: http://svn.apache.org/r1608971
Log:
QPID-5870 A Consumer is now marked if it's using a durable subscription.
The topic subscription queue is now deleted when the subscription ends unless
it's marked as a durable-topic-subscription.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Jul 8 23:18:46 2014
@@ -1105,6 +1105,7 @@ public abstract class AMQSession<C exten
try
{
C consumer = (C) createConsumer(dest, messageSelector,
noLocal);
+ consumer.markAsDurableSubscriber();
subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
// Save subscription information
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Jul 8 23:18:46 2014
@@ -1143,26 +1143,35 @@ public class AMQSession_0_10 extends AMQ
public boolean isQueueExist(AMQDestination dest, boolean assertNode)
throws AMQException
{
+ Node node = dest.getNode();
+ return isQueueExist(dest.getAddressName(), assertNode,
+ node.isDurable(), node.isAutoDelete(),
+ node.isExclusive(), node.getDeclareArgs());
+ }
+
+ public boolean isQueueExist(String queueName, boolean assertNode,
+ boolean durable, boolean autoDelete,
+ boolean exclusive, Map<String, Object> args)
throws AMQException
+ {
boolean match = true;
try
{
- QueueQueryResult result =
getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
- Node node = dest.getNode();
+ QueueQueryResult result = getQpidSession().queueQuery(queueName,
Option.NONE).get();
+ match = queueName.equals(result.getQueue());
if (match && assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
-
(matchProps(result.getArguments(),node.getDeclareArgs()));
+ match = (result.getDurable() == durable) &&
+ (result.getAutoDelete() == autoDelete) &&
+ (result.getExclusive() == exclusive) &&
+ (matchProps(result.getArguments(),args));
}
if (assertNode)
{
if (!match)
{
- throw new AMQException("Assert failed for address : " +
dest +", Result was : " + result);
+ throw new AMQException("Assert failed for queue : " +
queueName +", Result was : " + result);
}
}
}
@@ -1596,7 +1605,7 @@ public class AMQSession_0_10 extends AMQ
// We need to delete the subscription queue.
if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
dest.getLink().getSubscriptionQueue().isExclusive() &&
- isQueueExist(dest, false))
+ isQueueExist(dest.getQueueName(), false, false, false, false,
null))
{
getQpidSession().queueDelete(dest.getQueueName());
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Tue Jul 8 23:18:46 2014
@@ -134,6 +134,7 @@ public abstract class BasicMessageConsum
private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
+ private boolean _isDurableSubscriber = false;
protected BasicMessageConsumer(int channelId, AMQConnection connection,
AMQDestination destination,
String messageSelector, boolean noLocal,
MessageFactoryRegistry messageFactory,
@@ -1035,4 +1036,14 @@ public abstract class BasicMessageConsum
{
return _messageFactory;
}
+
+ protected boolean isDurableSubscriber()
+ {
+ return _isDurableSubscriber;
+ }
+
+ protected void markAsDurableSubscriber()
+ {
+ _isDurableSubscriber = true;
+ }
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Jul 8 23:18:46 2014
@@ -509,11 +509,14 @@ public class BasicMessageConsumer_0_10 e
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
- ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+ ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]