Author: orudyy
Date: Wed Oct 24 00:05:45 2012
New Revision: 1401515
URL: http://svn.apache.org/viewvc?rev=1401515&view=rev
Log:
QPID-4389: Send the selector of durable subscriber in arguments of ExchangeBind
command
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1401515&r1=1401514&r2=1401515&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Oct 24 00:05:45 2012
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -624,7 +625,9 @@ public class AMQSession_0_10 extends AMQ
{
if (AMQDestination.TOPIC_TYPE ==
consumer.getDestination().getAddressType())
{
- createSubscriptionQueue(consumer.getDestination(),
consumer.isNoLocal());
+ String selector = consumer.getMessageSelectorFilter() ==
null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(),
consumer.isNoLocal(), selector);
queueName = consumer.getDestination().getAMQQueueName();
consumer.setQueuename(queueName);
}
@@ -1300,8 +1303,8 @@ public class AMQSession_0_10 extends AMQ
}
}
}
-
- void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws
AMQException
+
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String
messageSelector) throws AMQException
{
Link link = dest.getLink();
String queueName = dest.getQueueName();
@@ -1325,12 +1328,14 @@ public class AMQSession_0_10 extends AMQ
link.isDurable() ? Option.DURABLE : Option.NONE,
queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+
bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(),
messageSelector == null ? "" : messageSelector);
getQpidSession().exchangeBind(queueName,
- dest.getAddressName(),
- dest.getSubject(),
- Collections.<String,Object>emptyMap());
+ dest.getAddressName(),
+ dest.getSubject(),
+ bindingArguments);
}
-
+
public void setLegacyFieldsForQueueType(AMQDestination dest)
{
// legacy support
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1401515&r1=1401514&r2=1401515&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Wed Oct 24 00:05:45 2012
@@ -941,7 +941,65 @@ public class AddressBasedDestinationTest
e.getMessage());
}
}
-
+
+ public void testDurableSubscription() throws Exception
+ {
+ Session session = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" +
getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic,
getTestQueueName());
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(),
((TextMessage)receivedMessage).getText());
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic,
getTestQueueName());
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(),
((TextMessage)receivedMessage).getText());
+ }
+
+ public void testDurableSubscriptionnWithSelector() throws Exception
+ {
+ Session session = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("ADDR:amq.topic/" +
getTestQueueName());
+ MessageProducer publisher = session.createProducer(topic);
+ MessageConsumer subscriber = session.createDurableSubscriber(topic,
getTestQueueName(), "id=1", false);
+
+ TextMessage messageToSend = session.createTextMessage("Test0");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ Message receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(),
((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+
+ subscriber.close();
+
+ messageToSend = session.createTextMessage("Test1");
+ messageToSend.setIntProperty("id", 1);
+ publisher.send(messageToSend);
+ ((AMQSession<?,?>)session).sync();
+
+ subscriber = session.createDurableSubscriber(topic,
getTestQueueName(), "id=1", false);
+ receivedMessage = subscriber.receive(1000);
+ assertNotNull("Message has not been received", receivedMessage);
+ assertEquals("Unexpected message", messageToSend.getText(),
((TextMessage)receivedMessage).getText());
+ assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id"));
+ }
+
private void createDurableSubscriber(Context ctx,Session ssn,String
destName,Topic topic, String producerAddr) throws Exception
{
MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]