Author: rajith
Date: Thu Mar 3 04:49:50 2011
New Revision: 1076516
URL: http://svn.apache.org/viewvc?rev=1076516&view=rev
Log:
QPID-3106
Instead of checking if it's an instance of AMQQueue, the code the now checks if
it's an instance of AMQDestination and javax.jms.Queue to cover the
AMQAnyDestination case. The same check is done for topics. Added test cases for
QueueReceivers, TopicSubscribers and DurableTopicSubscribers using the new
addressing scheme.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Mar 3 04:49:50 2011
@@ -1043,7 +1043,7 @@ public abstract class AMQSession<C exten
throws JMSException
{
checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic, true);
+ Topic origTopic = checkValidTopic(topic, true);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
String messageSelector = ((selector == null) ||
(selector.trim().length() == 0)) ? null : selector;
@@ -1307,8 +1307,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination) throws
JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1326,8 +1326,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination, String
messageSelector) throws JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination, messageSelector);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1344,7 +1344,7 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,11 +1363,23 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue, String messageSelector)
throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
+
+ private Queue validateQueue(Destination dest) throws
InvalidDestinationException
+ {
+ if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+ {
+ return (Queue)dest;
+ }
+ else
+ {
+ throw new InvalidDestinationException("The destination object used
is not from this provider or of type javax.jms.Queue");
+ }
+ }
public QueueSender createSender(Queue queue) throws JMSException
{
@@ -1408,7 +1420,7 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ Topic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C)
createExclusiveConsumer(dest));
@@ -1428,7 +1440,7 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic, String
messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ Topic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (C)
createExclusiveConsumer(dest, messageSelector, noLocal));
@@ -2395,7 +2407,7 @@ public abstract class AMQSession<C exten
/*
* I could have combined the last 3 methods, but this way it improves
readability
*/
- protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws
JMSException
+ protected Topic checkValidTopic(Topic topic, boolean durable) throws
JMSException
{
if (topic == null)
{
@@ -2414,17 +2426,17 @@ public abstract class AMQSession<C exten
("Cannot create a durable subscription with a temporary topic:
" + topic);
}
- if (!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQDestination && topic instanceof
javax.jms.Topic))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another
JMS Provider, class of topic provided is: "
+ topic.getClass().getName());
}
- return (AMQTopic) topic;
+ return topic;
}
- protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected Topic checkValidTopic(Topic topic) throws JMSException
{
return checkValidTopic(topic, false);
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
Thu Mar 3 04:49:50 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestina
super(exchangeName, exchangeClass, routingKey, isExclusive,
isAutoDelete, queueName, isDurable,bindingKeys);
}
- public static AMQTopic createDurableTopic(AMQTopic topic, String
subscriptionName, AMQConnection connection)
+ public static AMQTopic createDurableTopic(Topic topic, String
subscriptionName, AMQConnection connection)
throws JMSException
{
- if (topic.getDestSyntax() == DestSyntax.ADDR)
+ if (topic instanceof AMQDestination && topic instanceof
javax.jms.Topic)
{
- try
+ AMQDestination qpidTopic = (AMQDestination)topic;
+ if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
{
- AMQTopic t = new AMQTopic(topic.getAddress());
- AMQShortString queueName =
getDurableTopicQueueName(subscriptionName, connection);
- // link is never null if dest was created using an address
string.
- t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
-
- // The legacy fields are also populated just in case.
- t.setQueueName(queueName);
- t.setAutoDelete(false);
- t.setDurable(true);
- return t;
+ try
+ {
+ AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+ AMQShortString queueName =
getDurableTopicQueueName(subscriptionName, connection);
+ // link is never null if dest was created using an address
string.
+ t.getLink().setName(queueName.asString());
+ t.getSourceNode().setAutoDelete(false);
+ t.getSourceNode().setDurable(true);
+
+ // The legacy fields are also populated just in case.
+ t.setQueueName(queueName);
+ t.setAutoDelete(false);
+ t.setDurable(true);
+ return t;
+ }
+ catch(Exception e)
+ {
+ JMSException ex = new JMSException("Error creating durable
topic");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
- catch(Exception e)
+ else
{
- JMSException ex = new JMSException("Error creating durable
topic");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ return new AMQTopic(qpidTopic.getExchangeName(),
qpidTopic.getRoutingKey(), false,
+ getDurableTopicQueueName(subscriptionName,
connection),
+ true);
}
}
else
{
- return new AMQTopic(topic.getExchangeName(),
topic.getRoutingKey(), false,
- getDurableTopicQueueName(subscriptionName,
connection),
- true);
+ throw new InvalidDestinationException("The destination object used
is not from this provider or of type javax.jms.Topic");
}
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Thu Mar 3 04:49:50 2011
@@ -31,12 +31,18 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import javax.naming.Context;
import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
@@ -796,4 +802,46 @@ public class AddressBasedDestinationTest
{
}
}
+
+ public void testQueueReceiversAndTopicSubscriber() throws Exception
+ {
+ Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+ Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+
+ QueueSession qSession =
((AMQConnection)_connection).createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = qSession.createReceiver(queue);
+
+ TopicSession tSession =
((AMQConnection)_connection).createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = tSession.createSubscriber(topic);
+
+ Session ssn = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod1 =
ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+ prod1.send(ssn.createTextMessage("test1"));
+
+ MessageProducer prod2 =
ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+ prod2.send(ssn.createTextMessage("test2"));
+
+ Message msg1 = receiver.receive();
+ assertNotNull(msg1);
+ assertEquals("test1",((TextMessage)msg1).getText());
+
+ Message msg2 = sub.receive();
+ assertNotNull(msg2);
+ assertEquals("test2",((TextMessage)msg2).getText());
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ Session ssn =
_connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic topic = ssn.createTopic("news.us");
+
+ MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
+ MessageProducer prod = ssn.createProducer(topic);
+
+ Message m = ssn.createTextMessage("A");
+ prod.send(m);
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A",((TextMessage)msg).getText());
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]