Author: orudyy
Date: Wed Oct 24 00:05:45 2012
New Revision: 1401515

URL: http://svn.apache.org/viewvc?rev=1401515&view=rev
Log:
QPID-4389: Send the selector of durable subscriber in arguments of ExchangeBind 
command

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1401515&r1=1401514&r2=1401515&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Oct 24 00:05:45 2012
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.
 import org.apache.qpid.client.messaging.address.Link;
 import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
 import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -624,7 +625,9 @@ public class AMQSession_0_10 extends AMQ
         {
             if (AMQDestination.TOPIC_TYPE == 
consumer.getDestination().getAddressType())
             {
-                createSubscriptionQueue(consumer.getDestination(), 
consumer.isNoLocal());
+                String selector =  consumer.getMessageSelectorFilter() == 
null? null : consumer.getMessageSelectorFilter().getSelector();
+
+                createSubscriptionQueue(consumer.getDestination(), 
consumer.isNoLocal(), selector);
                 queueName = consumer.getDestination().getAMQQueueName();
                 consumer.setQueuename(queueName);
             }
@@ -1300,8 +1303,8 @@ public class AMQSession_0_10 extends AMQ
             }
         }
     }
-    
-    void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws 
AMQException
+
+    void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String 
messageSelector) throws AMQException
     {
         Link link = dest.getLink();
         String queueName = dest.getQueueName();
@@ -1325,12 +1328,14 @@ public class AMQSession_0_10 extends AMQ
                 link.isDurable() ? Option.DURABLE : Option.NONE,
                 queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
 
+        Map<String,Object> bindingArguments = new HashMap<String, Object>();
+        
bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), 
messageSelector == null ? "" : messageSelector);
         getQpidSession().exchangeBind(queueName,
-                                     dest.getAddressName(), 
-                                     dest.getSubject(), 
-                                     Collections.<String,Object>emptyMap());
+                              dest.getAddressName(),
+                              dest.getSubject(),
+                              bindingArguments);
     }
-    
+
     public void setLegacyFieldsForQueueType(AMQDestination dest)
     {
         // legacy support

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1401515&r1=1401514&r2=1401515&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 Wed Oct 24 00:05:45 2012
@@ -941,7 +941,65 @@ public class AddressBasedDestinationTest
                     e.getMessage());
         }
     }
-    
+
+    public void testDurableSubscription() throws Exception
+    {
+        Session session = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("ADDR:amq.topic/" + 
getTestQueueName());
+        MessageProducer publisher = session.createProducer(topic);
+        MessageConsumer subscriber = session.createDurableSubscriber(topic, 
getTestQueueName());
+
+        TextMessage messageToSend = session.createTextMessage("Test0");
+        publisher.send(messageToSend);
+        ((AMQSession<?,?>)session).sync();
+
+        Message receivedMessage = subscriber.receive(1000);
+        assertNotNull("Message has not been received", receivedMessage);
+        assertEquals("Unexpected message", messageToSend.getText(), 
((TextMessage)receivedMessage).getText());
+
+        subscriber.close();
+
+        messageToSend = session.createTextMessage("Test1");
+        publisher.send(messageToSend);
+        ((AMQSession<?,?>)session).sync();
+
+        subscriber = session.createDurableSubscriber(topic, 
getTestQueueName());
+        receivedMessage = subscriber.receive(1000);
+        assertNotNull("Message has not been received", receivedMessage);
+        assertEquals("Unexpected message", messageToSend.getText(), 
((TextMessage)receivedMessage).getText());
+    }
+
+    public void testDurableSubscriptionnWithSelector() throws Exception
+    {
+        Session session = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("ADDR:amq.topic/" + 
getTestQueueName());
+        MessageProducer publisher = session.createProducer(topic);
+        MessageConsumer subscriber = session.createDurableSubscriber(topic, 
getTestQueueName(), "id=1", false);
+
+        TextMessage messageToSend = session.createTextMessage("Test0");
+        messageToSend.setIntProperty("id", 1);
+        publisher.send(messageToSend);
+        ((AMQSession<?,?>)session).sync();
+
+        Message receivedMessage = subscriber.receive(1000);
+        assertNotNull("Message has not been received", receivedMessage);
+        assertEquals("Unexpected message", messageToSend.getText(), 
((TextMessage)receivedMessage).getText());
+        assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+
+        subscriber.close();
+
+        messageToSend = session.createTextMessage("Test1");
+        messageToSend.setIntProperty("id", 1);
+        publisher.send(messageToSend);
+        ((AMQSession<?,?>)session).sync();
+
+        subscriber = session.createDurableSubscriber(topic, 
getTestQueueName(), "id=1", false);
+        receivedMessage = subscriber.receive(1000);
+        assertNotNull("Message has not been received", receivedMessage);
+        assertEquals("Unexpected message", messageToSend.getText(), 
((TextMessage)receivedMessage).getText());
+        assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+    }
+
     private void createDurableSubscriber(Context ctx,Session ssn,String 
destName,Topic topic, String producerAddr) throws Exception
     {        
         MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to