Author: grkvlt
Date: Tue Dec 14 14:20:36 2010
New Revision: 1049101

URL: http://svn.apache.org/viewvc?rev=1049101&view=rev
Log:
QPID-2970: Add dlq proprty to didsable dlq creation in tests

Modified:
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties 
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties Tue 
Dec 14 14:20:36 2010
@@ -41,6 +41,8 @@ queue = test
 ## dlq properties
 ##
 
+# should the queue be created with dead letter queue enabled
+dlq = true
 # maximum times a message will be redelivered before dlq
 maxRedelivery = 3
 # maxRecords must be greater than max(maxPrefetch, count) * threads

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
 Tue Dec 14 14:20:36 2010
@@ -42,16 +42,19 @@ public class Check extends Client
     public Integer call() throws Exception {
         start();
         
-        Message msg;
-        while ((msg = _consumer.receive(1000)) != null)
+        if (_dlq)
         {
-               int number = msg.getIntProperty("number");
-               boolean rejectMessage = (number % _reject) == 0;
-               if (!rejectMessage)
-               {
-                   throw new RuntimeException("unexpected message on dlq: " + 
number);
-               }
-               _check++;
+            Message msg;
+            while ((msg = _consumer.receive(1000)) != null)
+            {
+               int number = msg.getIntProperty("number");
+               boolean rejectMessage = (number % _reject) == 0;
+               if (!rejectMessage)
+               {
+                   throw new RuntimeException("unexpected message on dlq: " + 
number);
+               }
+               _check++;
+            }
         }
         return Integer.valueOf(_check);
     }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
 Tue Dec 14 14:20:36 2010
@@ -38,11 +38,12 @@ public abstract class Client implements 
     protected boolean _clientAck;
     protected String _queueName;
     protected int _count;
-    protected boolean _messageIds;
+    protected boolean _messageIdsDisabled;
     protected boolean _persistent;
     protected int _size;
     protected int _threads;
     protected int _maxRecords;
+    protected boolean _dlq;
     
     protected Connection _connection;
     protected Session _session;
@@ -68,9 +69,10 @@ public abstract class Client implements 
         _persistent = Boolean.parseBoolean(_props.getProperty(PERSISTENT));
         _count = Integer.parseInt(_props.getProperty(COUNT));
         _size = Integer.parseInt(_props.getProperty(SIZE));
-        _messageIds = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
+        _messageIdsDisabled = 
!Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
         _threads = Integer.parseInt(_props.getProperty(THREADS));
         _maxRecords = Integer.parseInt(_props.getProperty(MAX_RECORDS));
+        _dlq = Boolean.parseBoolean(_props.getProperty(DLQ));
     }
     
     public void shutdown()

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
 Tue Dec 14 14:20:36 2010
@@ -17,6 +17,7 @@ public interface Config
     String REJECT = "reject";
     String REJECT_COUNT = "rejectCount";
     String REPEAT = "repeat";
+    String DLQ = "dlq";
     
     String SESSION_TRANSACTED = "SESSION_TRANSACTED";
     String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
 Tue Dec 14 14:20:36 2010
@@ -41,7 +41,7 @@ public class Create extends Client
         _queue = new AMQQueue(burl);
 
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true);
+        arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), _dlq);
         
         ((AMQSession<?,?>) _session).createQueue(new 
AMQShortString(_queueName), false, false, false, arguments);
         ((AMQSession<?,?>) _session).declareAndBind((AMQDestination) new 
AMQQueue("amq.direct", _queueName));
@@ -50,9 +50,12 @@ public class Create extends Client
         while (_consumer.receive(1000) != null);
         _consumer.close();
         
-        _queue = _session.createQueue(_queueName + 
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
-        _consumer = _session.createConsumer(_queue);
-        while (_consumer.receive(1000) != null);
+        if (_dlq)
+        {
+               _queue = _session.createQueue(_queueName + 
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+               _consumer = _session.createConsumer(_queue);
+               while (_consumer.receive(1000) != null);
+        }
     }
 }
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
 Tue Dec 14 14:20:36 2010
@@ -32,6 +32,7 @@ public class Receiver extends Client
     private static AtomicInteger _rejectedCount;
     private static int _consumedCheck;
     private static int _rejectedCheck;
+    private static boolean _sessionOk;
      
     public Receiver(Properties props)
     {
@@ -57,10 +58,10 @@ public class Receiver extends Client
         _reject = Integer.parseInt(_props.getProperty(REJECT));
         _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
 
-        boolean sessionOk = (_transacted || _clientAck) ||
+        _sessionOk = (_transacted || _clientAck) ||
                 ((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == 
Session.DUPS_OK_ACKNOWLEDGE) && _listener);
-        _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || 
_rejectCount < _maxRedelivery) ? 0 : _count / _reject;
-        _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count 
/ _reject) * _rejectCount) : 0);
+        _rejectedCheck = (!_sessionOk || _messageIdsDisabled || _maxRedelivery 
== 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
+        _consumedCheck = (_count - _rejectedCheck); // + (_sessionOk ? 
((_count / _reject) * _rejectCount) : 0);
             
         _consumer = _session.createConsumer(_queue);
         
@@ -111,25 +112,28 @@ public class Receiver extends Client
                    }
                 rejectCount = _rejected.get(number) + 1;
                    _rejected.put(number, rejectCount);
-                   if (rejectCount <= _rejectCount)
+                   if (rejectCount <= _rejectCount && rejectCount 
<=_maxRedelivery)
                    {
-                           if (rejectCount == _maxRedelivery)
+                           if (_dlq && _sessionOk && rejectCount == 
_maxRedelivery)
                            {
                                _rejectedCount.incrementAndGet();
                                _log.debug("client " + _client + " rejecting 
message (" + rejectCount + ") " + msg.getJMSMessageID());
                            }
-                    if (rejectCount > _maxRedelivery)
-                    {
-                        throw new RuntimeException("client " + _client + " 
received message " + msg.getJMSMessageID() +
-                                " " + rejectCount + " times");
-                    }
                            if (_transacted)
                            {
+                               _log.debug("client " + _client + " rollback of 
message (" + rejectCount + ") " + msg.getJMSMessageID());
                                _session.rollback();
                            }
                            else
                            {
-                               _session.recover();
+                               if (_sessionOk)
+                               {
+                                       _session.recover();
+                               }
+                               else
+                               {
+                                       rejectMessage = false;
+                               }
                            }
                    }
                    else
@@ -142,6 +146,7 @@ public class Receiver extends Client
                {
                    _receivedCount++;
                        _totalConsumedCount.incrementAndGet();
+                _log.debug("client " + _client + " consumed message " + 
_receivedCount + " of " + _totalConsumedCount.get());
                    if (_transacted)
                    {
                            _session.commit();

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
 Tue Dec 14 14:20:36 2010
@@ -40,7 +40,7 @@ public class Sender extends Client
     {
         _producer = _session.createProducer(_queue);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
-        _producer.setDisableMessageID(_messageIds);
+        _producer.setDisableMessageID(_messageIdsDisabled);
 
         _connection.start();
     }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java?rev=1049101&r1=1049100&r2=1049101&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
 Tue Dec 14 14:20:36 2010
@@ -104,7 +104,7 @@ public class PerformanceTest
                for (Future<Integer> receive : receives)
                {
                        _consumed += receive.get();
-               }    
+               }
 
                Client check = new Check(_props);
                check.connect();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to