Author: rajith
Date: Thu Mar  3 04:49:50 2011
New Revision: 1076516

URL: http://svn.apache.org/viewvc?rev=1076516&view=rev
Log:
QPID-3106
Instead of checking if it's an instance of AMQQueue, the code the now checks if 
it's an instance of AMQDestination and javax.jms.Queue to cover the 
AMQAnyDestination case. The same check is done for topics. Added test cases for 
QueueReceivers, TopicSubscribers and DurableTopicSubscribers using the new 
addressing scheme.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Mar  3 04:49:50 2011
@@ -1043,7 +1043,7 @@ public abstract class AMQSession<C exten
             throws JMSException
     {
         checkNotClosed();
-        AMQTopic origTopic = checkValidTopic(topic, true);
+        Topic origTopic = checkValidTopic(topic, true);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
         
         String messageSelector = ((selector == null) || 
(selector.trim().length() == 0)) ? null : selector;
@@ -1307,8 +1307,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination) throws 
JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1326,8 +1326,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination, String 
messageSelector) throws JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination, messageSelector);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1344,7 +1344,7 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,11 +1363,23 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue, String messageSelector) 
throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
+    
+    private Queue validateQueue(Destination dest) throws 
InvalidDestinationException
+    {
+        if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+        {
+            return (Queue)dest;
+        }
+        else
+        {
+            throw new InvalidDestinationException("The destination object used 
is not from this provider or of type javax.jms.Queue");
+        }
+    }
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
@@ -1408,7 +1420,7 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        Topic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (C) 
createExclusiveConsumer(dest));
@@ -1428,7 +1440,7 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic, String 
messageSelector, boolean noLocal) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        Topic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (C) 
createExclusiveConsumer(dest, messageSelector, noLocal));
@@ -2395,7 +2407,7 @@ public abstract class AMQSession<C exten
     /*
      * I could have combined the last 3 methods, but this way it improves 
readability
      */
-    protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws 
JMSException
+    protected Topic checkValidTopic(Topic topic, boolean durable) throws 
JMSException
     {
         if (topic == null)
         {
@@ -2414,17 +2426,17 @@ public abstract class AMQSession<C exten
                 ("Cannot create a durable subscription with a temporary topic: 
" + topic);
         }
 
-        if (!(topic instanceof AMQTopic))
+        if (!(topic instanceof AMQDestination && topic instanceof 
javax.jms.Topic))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another 
JMS Provider, class of topic provided is: "
                     + topic.getClass().getName());
         }
 
-        return (AMQTopic) topic;
+        return topic;
     }
 
-    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+    protected Topic checkValidTopic(Topic topic) throws JMSException
     {
         return checkValidTopic(topic, false);
     }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java 
(original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java 
Thu Mar  3 04:49:50 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestina
         super(exchangeName, exchangeClass, routingKey, isExclusive, 
isAutoDelete, queueName, isDurable,bindingKeys);
     }
 
-    public static AMQTopic createDurableTopic(AMQTopic topic, String 
subscriptionName, AMQConnection connection)
+    public static AMQTopic createDurableTopic(Topic topic, String 
subscriptionName, AMQConnection connection)
             throws JMSException
     {
-        if (topic.getDestSyntax() == DestSyntax.ADDR)
+        if (topic instanceof AMQDestination && topic instanceof 
javax.jms.Topic)
         {
-            try
+            AMQDestination qpidTopic = (AMQDestination)topic;
+            if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
             {
-                AMQTopic t = new AMQTopic(topic.getAddress());
-                AMQShortString queueName = 
getDurableTopicQueueName(subscriptionName, connection);
-                // link is never null if dest was created using an address 
string.
-                t.getLink().setName(queueName.asString());               
-                t.getSourceNode().setAutoDelete(false);
-                t.getSourceNode().setDurable(true);
-                
-                // The legacy fields are also populated just in case.
-                t.setQueueName(queueName);
-                t.setAutoDelete(false);
-                t.setDurable(true);
-                return t;
+                try
+                {
+                    AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+                    AMQShortString queueName = 
getDurableTopicQueueName(subscriptionName, connection);
+                    // link is never null if dest was created using an address 
string.
+                    t.getLink().setName(queueName.asString());               
+                    t.getSourceNode().setAutoDelete(false);
+                    t.getSourceNode().setDurable(true);
+                    
+                    // The legacy fields are also populated just in case.
+                    t.setQueueName(queueName);
+                    t.setAutoDelete(false);
+                    t.setDurable(true);
+                    return t;
+                }
+                catch(Exception e)
+                {
+                    JMSException ex = new JMSException("Error creating durable 
topic");
+                    ex.initCause(e);
+                    ex.setLinkedException(e);
+                    throw ex;
+                }
             }
-            catch(Exception e)
+            else
             {
-                JMSException ex = new JMSException("Error creating durable 
topic");
-                ex.initCause(e);
-                ex.setLinkedException(e);
-                throw ex;
+                return new AMQTopic(qpidTopic.getExchangeName(), 
qpidTopic.getRoutingKey(), false,
+                                getDurableTopicQueueName(subscriptionName, 
connection),
+                                true);
             }
         }
         else
         {
-            return new AMQTopic(topic.getExchangeName(), 
topic.getRoutingKey(), false,
-                            getDurableTopicQueueName(subscriptionName, 
connection),
-                            true);
+            throw new InvalidDestinationException("The destination object used 
is not from this provider or of type javax.jms.Topic");
         }
     }
 

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 Thu Mar  3 04:49:50 2011
@@ -31,12 +31,18 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import javax.naming.Context;
 
 import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
@@ -796,4 +802,46 @@ public class AddressBasedDestinationTest
         {            
         }
     }
+    
+    public void testQueueReceiversAndTopicSubscriber() throws Exception
+    {
+        Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+        Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+        
+        QueueSession qSession = 
((AMQConnection)_connection).createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        QueueReceiver receiver = qSession.createReceiver(queue);
+        
+        TopicSession tSession = 
((AMQConnection)_connection).createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = tSession.createSubscriber(topic);
+        
+        Session ssn = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod1 = 
ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+        prod1.send(ssn.createTextMessage("test1"));
+        
+        MessageProducer prod2 = 
ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+        prod2.send(ssn.createTextMessage("test2"));
+        
+        Message msg1 = receiver.receive();
+        assertNotNull(msg1);
+        assertEquals("test1",((TextMessage)msg1).getText());
+        
+        Message msg2 = sub.receive();
+        assertNotNull(msg2);
+        assertEquals("test2",((TextMessage)msg2).getText());  
+    }
+    
+    public void testDurableSubscriber() throws Exception
+    {
+        Session ssn = 
_connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        Topic topic = ssn.createTopic("news.us");
+        
+        MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
+        MessageProducer prod = ssn.createProducer(topic);
+        
+        Message m = ssn.createTextMessage("A");
+        prod.send(m);
+        Message msg = cons.receive(1000);
+        assertNotNull(msg);
+        assertEquals("A",((TextMessage)msg).getText());
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to