Author: rajith
Date: Tue Jul  8 23:18:46 2014
New Revision: 1608971

URL: http://svn.apache.org/r1608971
Log:
QPID-5870 A Consumer is now marked if it's using a durable subscription.
The topic subscription queue is now deleted when the subscription ends unless 
it's marked as a durable-topic-subscription.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Jul  8 23:18:46 2014
@@ -1105,6 +1105,7 @@ public abstract class AMQSession<C exten
             try
             {
                 C consumer = (C) createConsumer(dest, messageSelector, 
noLocal);
+                consumer.markAsDurableSubscriber();
                 subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
 
                 // Save subscription information

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Jul  8 23:18:46 2014
@@ -1143,26 +1143,35 @@ public class AMQSession_0_10 extends AMQ
 
     public boolean isQueueExist(AMQDestination dest, boolean assertNode) 
throws AMQException
     {
+        Node node = dest.getNode();
+        return isQueueExist(dest.getAddressName(), assertNode,
+                            node.isDurable(), node.isAutoDelete(),
+                            node.isExclusive(), node.getDeclareArgs());
+    }
+    
+    public boolean isQueueExist(String queueName, boolean assertNode,
+                                boolean durable, boolean autoDelete,
+                                boolean exclusive, Map<String, Object> args) 
throws AMQException
+    {    
         boolean match = true;
         try
         {
-            QueueQueryResult result = 
getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
-            match = dest.getAddressName().equals(result.getQueue());
-            Node node = dest.getNode();
+            QueueQueryResult result = getQpidSession().queueQuery(queueName, 
Option.NONE).get();
+            match = queueName.equals(result.getQueue());
 
             if (match && assertNode)
             {
-                match = (result.getDurable() == node.isDurable()) &&
-                         (result.getAutoDelete() == node.isAutoDelete()) &&
-                         (result.getExclusive() == node.isExclusive()) &&
-                         
(matchProps(result.getArguments(),node.getDeclareArgs()));
+                match = (result.getDurable() == durable) &&
+                         (result.getAutoDelete() == autoDelete) &&
+                         (result.getExclusive() == exclusive) &&
+                         (matchProps(result.getArguments(),args));
             }
 
             if (assertNode)
             {
                 if (!match)
                 {
-                    throw new AMQException("Assert failed for address : " + 
dest  +", Result was : " + result);
+                    throw new AMQException("Assert failed for queue : " + 
queueName  +", Result was : " + result);
                 }
             }
         }
@@ -1596,7 +1605,7 @@ public class AMQSession_0_10 extends AMQ
         // We need to delete the subscription queue.
         if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
             dest.getLink().getSubscriptionQueue().isExclusive() &&
-            isQueueExist(dest, false))
+            isQueueExist(dest.getQueueName(), false, false, false, false, 
null))
         {
             getQpidSession().queueDelete(dest.getQueueName());
         }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Jul  8 23:18:46 2014
@@ -134,6 +134,7 @@ public abstract class BasicMessageConsum
     private final boolean _browseOnly;
     private List<StackTraceElement> _closedStack = null;
 
+    private boolean _isDurableSubscriber = false;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination,
                                    String messageSelector, boolean noLocal, 
MessageFactoryRegistry messageFactory,
@@ -1035,4 +1036,14 @@ public abstract class BasicMessageConsum
     {
         return _messageFactory;
     }
+
+    protected boolean isDurableSubscriber()
+    {
+        return _isDurableSubscriber;
+    }
+
+    protected void markAsDurableSubscriber()
+    {
+        _isDurableSubscriber = true;
+    }
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1608971&r1=1608970&r2=1608971&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Tue Jul  8 23:18:46 2014
@@ -509,11 +509,14 @@ public class BasicMessageConsumer_0_10 e
             if (dest.getDelete() == AddressOption.ALWAYS ||
                 dest.getDelete() == AddressOption.RECEIVER )
             {
-                ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
-                ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+                ((AMQSession_0_10) getSession()).handleNodeDelete(dest);       
         
             }
             // Subscription queue is handled as part of linkDelete method.
             ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+            if (!isDurableSubscriber())
+            {
+                ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+            }
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to