Author: rajith
Date: Thu Jul 22 17:27:24 2010
New Revision: 966763

URL: http://svn.apache.org/viewvc?rev=966763&view=rev
Log:
QPID-2752
Added a test case to create and LVQ from the JMS client using the addressing 
syntax.
Fixed a few bugs in QpidQueueOptions.java.
Modified the MapAccessor to allow any value to be retrieved as a String.

Added:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=966763&r1=966762&r2=966763&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
 Thu Jul 22 17:27:24 2010
@@ -66,6 +66,7 @@ public class AddressHelper
     public static final String QUEUE = "queue";
     public static final String KEY = "key";
     public static final String ARGUMENTS = "arguments";
+    public static final String RELIABILITY = "reliability";
     
     private Address address;
     private Accessor addressProps;
@@ -142,13 +143,11 @@ public class AddressHelper
         
         if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null)
         {
-            
options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE));
-            options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
+            options.setOrderingPolicy(QpidQueueOptions.QPID_LAST_VALUE_QUEUE);
         }
         else if 
(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null)
         {
-            
options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE));
-            options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
+            
options.setOrderingPolicy(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE);
         }
         
         if (args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != 
null)

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java?rev=966763&r1=966762&r2=966763&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
 Thu Jul 22 17:27:24 2010
@@ -67,18 +67,19 @@ public class QpidQueueOptions extends Ha
     
     public void setOrderingPolicy(String s)
     {
-        if ("lvq".equals(s))
+        if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE.equals(s))
         {
             this.put(QPID_LAST_VALUE_QUEUE, 1);
         }
-        else if ("lvq_no_browse".equals(s))
+        else if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE.equals(s))
         {
             this.put(QPID_LAST_VALUE_QUEUE_NO_BROWSE,1);
         }
         else
         {
             throw new IllegalArgumentException("Invalid Ordering Policy" +
-            " should be one of {lvq|lvq_no_browse}");
+            " should be one of {" + QpidQueueOptions.QPID_LAST_VALUE_QUEUE + 
"|" + 
+            QPID_LAST_VALUE_QUEUE_NO_BROWSE + "}");
         }
     }
     
@@ -87,20 +88,16 @@ public class QpidQueueOptions extends Ha
         this.put(QPID_LVQ_KEY, key);
     }
     
-    public void setQueueEvents(String s)
+    public void setQueueEvents(String value)
     {
-        if (s.equals("enque_only"))
+        if (value != null &&  (value.equals("1") || value.equals("2")))
         {
-            this.put(QPID_QUEUE_EVENT_GENERATION, 1);
-        }
-        else if (s.equals("enque_and_dequeue"))
-        {
-            this.put(QPID_QUEUE_EVENT_GENERATION,2);
+            this.put(QPID_QUEUE_EVENT_GENERATION, value);
         }
         else
         {
-            throw new IllegalArgumentException("Invalid value" +
-            " should be one of {enqueue_only|enqueue_and_dequeue}");
+            throw new IllegalArgumentException("Invalid value for " + 
+                    QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}");
         }
     }
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java?rev=966763&r1=966762&r2=966763&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
 Thu Jul 22 17:27:24 2010
@@ -130,7 +130,14 @@ public interface Accessor
         {
             if (source != null && source.containsKey(name))
             {
-                return (String)source.get(name);
+                if (source.get(name) instanceof String)
+                {
+                    return (String)source.get(name);
+                }
+                else
+                {
+                    return String.valueOf(source.get(name));
+                }
             }
             else
             {

Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java?rev=966763&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java
 Thu Jul 22 17:27:24 2010
@@ -0,0 +1,64 @@
+package org.apache.qpid.test.client.queue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LVQTest extends QpidBrokerTestCase
+{
+    private static final Logger _logger = 
LoggerFactory.getLogger(LVQTest.class);
+    private Connection _connection;
+    
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _connection = getConnection() ;
+        _connection.start();
+    }
+    
+    @Override
+    public void tearDown() throws Exception
+    {
+        _connection.close();
+        super.tearDown();
+    }
+    
+    public void testLVQQueue() throws Exception
+    {
+        String addr = "ADDR:my-lvq-queue; {create: always, " +
+        "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
+        "x-declare:{'qpid.last_value_queue':1}}}";
+                
+        Session ssn = 
_connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        
+        Destination dest = ssn.createQueue(addr);
+        MessageConsumer consumer = ssn.createConsumer(dest);
+        MessageProducer prod = 
ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
+        
+        for (int i=0; i<40; i++)
+        {
+            Message msg = ssn.createTextMessage(String.valueOf(i));
+            msg.setStringProperty("qpid.LVQ_key", String.valueOf(i%10));
+            prod.send(msg);
+        }
+         
+        for (int i=0; i<10; i++)
+        {
+            TextMessage msg = (TextMessage)consumer.receive(500);
+            assertEquals("The last value is not reflected","3" + 
i,msg.getText());
+        }
+        
+        assertNull("There should not be anymore 
messages",consumer.receive(500));
+    }
+    
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to