Author: orudyy
Date: Tue Nov 18 15:36:48 2014
New Revision: 1640371

URL: http://svn.apache.org/r1640371
Log:
QPID-6231: [Perftests framework] Make performance framework

Added:
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
Modified:
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
    
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
 Tue Nov 18 15:36:48 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.disttest.Distribu
 import org.apache.qpid.disttest.Visitor;
 import org.apache.qpid.disttest.jms.ClientJmsDelegate;
 import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
 import org.apache.qpid.disttest.message.ParticipantResult;
 import org.apache.qpid.disttest.message.Response;
 import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class Client
 
     public void stop()
     {
+        _clientJmsDelegate.sendResponseMessage(new 
Response(_clientJmsDelegate.getClientName(), CommandType.STOP_CLIENT, null));
         _state.set(ClientState.STOPPED);
         _latch.countDown();
     }
@@ -119,7 +121,10 @@ public class Client
         }
         finally
         {
-            _clientJmsDelegate.sendResponseMessage(new 
Response(_clientJmsDelegate.getClientName(), command.getType(), 
responseMessage));
+            if (_state.get() != ClientState.STOPPED)
+            {
+                _clientJmsDelegate.sendResponseMessage(new 
Response(_clientJmsDelegate.getClientName(), command.getType(), 
responseMessage));
+            }
         }
     }
 

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
 Tue Nov 18 15:36:48 2014
@@ -45,6 +45,7 @@ public class ClientCommandVisitor extend
 
     public void visit(final StopClientCommand command)
     {
+
         _client.stop();
     }
 

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
 Tue Nov 18 15:36:48 2014
@@ -19,12 +19,14 @@
  */
 package org.apache.qpid.disttest.jms;
 
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.ConnectionMetaData;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -63,6 +65,7 @@ public class ClientJmsDelegate
     private final Context _context;
     private final Destination _controllerQueue;
     private final Connection _controllerConnection;
+    private final Session _instructionListenerSession;
     private final Session _controllerSession;
     private final MessageProducer _controlQueueProducer;
 
@@ -87,6 +90,7 @@ public class ClientJmsDelegate
             _controllerConnection = connectionFactory.createConnection();
             _controllerConnection.start();
             _controllerQueue = (Destination) 
context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME);
+            _instructionListenerSession = 
_controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             _controllerSession = _controllerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             _controlQueueProducer = 
_controllerSession.createProducer(_controllerQueue);
             _clientName = UUID.randomUUID().toString();
@@ -112,8 +116,8 @@ public class ClientJmsDelegate
     {
         try
         {
-            _instructionQueue = _controllerSession.createTemporaryQueue();
-            final MessageConsumer instructionConsumer = 
_controllerSession.createConsumer(_instructionQueue);
+            _instructionQueue = 
_instructionListenerSession.createTemporaryQueue();
+            final MessageConsumer instructionConsumer = 
_instructionListenerSession.createConsumer(_instructionQueue);
             instructionConsumer.setMessageListener(new MessageListener()
             {
                 @Override
@@ -170,6 +174,10 @@ public class ClientJmsDelegate
                             .getConnectionFactoryName());
             final Connection newConnection = 
connectionFactory.createConnection();
             addConnection(command.getConnectionName(), newConnection);
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Connection " + command.getConnectionName() + " 
is created " + metaDataToString(newConnection.getMetaData()));
+            }
         }
         catch (final NamingException ne)
         {
@@ -183,6 +191,26 @@ public class ClientJmsDelegate
         }
     }
 
+    private String metaDataToString(ConnectionMetaData metaData) throws 
JMSException
+    {
+        StringBuilder sb = new StringBuilder("ConnectionMetaData[");
+        sb.append(" JMSProviderName : " + metaData.getJMSProviderName());
+        sb.append(" JMSVersion : " + metaData.getJMSVersion() + " (" + 
metaData.getJMSMajorVersion() + "." + metaData.getJMSMinorVersion() +")");
+        sb.append(" ProviderVersion : " + metaData.getProviderVersion()+ " (" 
+ metaData.getProviderMajorVersion()+ "." + metaData.getProviderMinorVersion() 
+")" );
+        sb.append(" JMSXPropertyNames : [");
+        Enumeration en = metaData.getJMSXPropertyNames();
+        while(en.hasMoreElements())
+        {
+            sb.append(" ").append(en.nextElement());
+            if( en.hasMoreElements())
+            {
+                sb.append(",");
+            }
+        }
+        sb.append("]]");
+        return sb.toString();
+    }
+
     public void createSession(final CreateSessionCommand command)
     {
         try
@@ -312,6 +340,10 @@ public class ClientJmsDelegate
             // finish.
             _controllerConnection.stop();
 
+            if (_instructionListenerSession != null)
+            {
+                _instructionListenerSession.close();
+            }
             if (_controllerSession != null)
             {
                 _controllerSession.close();

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
 Tue Nov 18 15:36:48 2014
@@ -54,7 +54,8 @@ public class ControllerJmsDelegate
     private final Map<String, Destination> _clientNameToQueueMap = new 
ConcurrentHashMap<String, Destination>();
     private final Connection _connection;
     private final Destination _controllerQueue;
-    private final Session _session;
+    private final Session _controllerQueueListenerSession;
+    private final Session _commandSession;
     private QueueCreator _queueCreator;
 
     private List<CommandListener> _commandListeners = new 
CopyOnWriteArrayList<CommandListener>();
@@ -65,7 +66,8 @@ public class ControllerJmsDelegate
         _connection = connectionFactory.createConnection();
         _connection.start();
         _controllerQueue = (Destination) context.lookup("controllerqueue");
-        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _controllerQueueListenerSession = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        _commandSession = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
         createVendorSpecificQueueCreator();
     }
@@ -105,7 +107,7 @@ public class ControllerJmsDelegate
     {
         try
         {
-            final MessageConsumer consumer = 
_session.createConsumer(_controllerQueue);
+            final MessageConsumer consumer = 
_controllerQueueListenerSession.createConsumer(_controllerQueue);
             consumer.setMessageListener(new MessageListener()
             {
                 @Override
@@ -138,13 +140,25 @@ public class ControllerJmsDelegate
     /** ensures connections are closed, otherwise the JVM may be prevented 
from terminating */
     public void closeConnections()
     {
+        if (_commandSession != null)
+        {
+            try
+            {
+                _commandSession.close();
+            }
+            catch (JMSException e)
+            {
+                LOGGER.error("Unable to close command session", e);
+            }
+        }
+
         try
         {
-            _session.close();
+            _controllerQueueListenerSession.close();
         }
         catch (JMSException e)
         {
-            LOGGER.error("Unable to close session", e);
+            LOGGER.error("Unable to close controller queue listener session", 
e);
         }
 
         try
@@ -182,10 +196,11 @@ public class ControllerJmsDelegate
                             + _clientNameToQueueMap.keySet());
         }
 
+        MessageProducer producer = null;
         try
         {
-            final MessageProducer producer = 
_session.createProducer(clientQueue);
-            final Message message = 
JmsMessageAdaptor.commandToMessage(_session, command);
+            producer =_commandSession.createProducer(clientQueue);
+            Message message = 
JmsMessageAdaptor.commandToMessage(_commandSession, command);
 
             producer.send(message);
         }
@@ -193,6 +208,20 @@ public class ControllerJmsDelegate
         {
             throw new DistributedTestException(e);
         }
+        finally
+        {
+            if (producer != null)
+            {
+                try
+                {
+                    producer.close();
+                }
+                catch (final JMSException e)
+                {
+                    throw new DistributedTestException(e);
+                }
+            }
+        }
     }
 
     private void processCommandWithFirstSupportingListener(Command command)
@@ -214,7 +243,7 @@ public class ControllerJmsDelegate
         Destination clientIntructionQueue;
         try
         {
-            clientIntructionQueue = _session.createQueue(clientQueueName);
+            clientIntructionQueue = 
_commandSession.createQueue(clientQueueName);
         }
         catch (JMSException e)
         {
@@ -225,12 +254,12 @@ public class ControllerJmsDelegate
 
     public void createQueues(List<QueueConfig> queues)
     {
-        _queueCreator.createQueues(_connection, _session, queues);
+        _queueCreator.createQueues(_connection, _commandSession, queues);
     }
 
     public void deleteQueues(List<QueueConfig> queues)
     {
-        _queueCreator.deleteQueues(_connection, _session, queues);
+        _queueCreator.deleteQueues(_connection, _commandSession, queues);
     }
 
     public void addCommandListener(CommandListener commandListener)

Added: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java?rev=1640371&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
 (added)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
 Tue Nov 18 15:36:48 2014
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.jms;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import java.util.List;
+
+public class ExistingQueueDrainer implements QueueCreator
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExistingQueueDrainer.class);
+    private static int _drainPollTimeout = 
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
+
+    @Override
+    public void createQueues(Connection connection, Session session, 
List<QueueConfig> configs)
+    {
+    }
+
+    @Override
+    public void deleteQueues(Connection connection, Session session, 
List<QueueConfig> configs)
+    {
+        for (QueueConfig queueConfig : configs)
+        {
+            drainQueue(connection, queueConfig.getName());
+        }
+    }
+
+    private void drainQueue(Connection connection, String queueName)
+    {
+        try
+        {
+            int counter = 0;
+            while (queueContainsMessages(connection, queueName))
+            {
+                if (counter == 0)
+                {
+                    LOGGER.debug("Draining queue {}", queueName);
+                }
+                counter += drain(connection, queueName);
+            }
+            if (counter > 0)
+            {
+                LOGGER.info("Drained {} message(s) from queue {} ", counter, 
queueName);
+            }
+        }
+        catch (JMSException e)
+        {
+            throw new DistributedTestException("Failed to drain queue " + 
queueName, e);
+        }
+    }
+
+    private int drain(Connection connection, String queueName) throws 
JMSException
+    {
+        int counter = 0;
+        Session session = null;
+        try
+        {
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = 
session.createConsumer(session.createQueue(queueName));
+            try
+            {
+                while (messageConsumer.receive(_drainPollTimeout) != null)
+                {
+                    counter++;
+                }
+            }
+            finally
+            {
+                messageConsumer.close();
+            }
+        }
+        finally
+        {
+            if (session != null)
+            {
+                session.close();
+            }
+        }
+        return counter;
+    }
+
+    private boolean queueContainsMessages(Connection connection, String 
queueName) throws JMSException
+    {
+        Session session = null;
+        try
+        {
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            QueueBrowser browser = null;
+            try
+            {
+                browser = 
session.createBrowser(session.createQueue(queueName));
+                return browser.getEnumeration().hasMoreElements();
+            }
+            finally
+            {
+                if (browser != null)
+                {
+                    browser.close();
+                }
+            }
+        }
+        finally
+        {
+            if (session != null)
+            {
+                session.close();
+            }
+        }
+    }
+}

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
 Tue Nov 18 15:36:48 2014
@@ -38,7 +38,6 @@ public class QpidQueueCreator implements
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QpidQueueCreator.class);
     private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new 
FieldTable();
-    private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = 
"qpid.disttest.queue.creator.drainPollTime";
     private static int _drainPollTimeout = 
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
 
     @Override

Modified: 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java?rev=1640371&r1=1640370&r2=1640371&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
 (original)
+++ 
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
 Tue Nov 18 15:36:48 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.disttest.controll
 
 public interface QueueCreator
 {
+    String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = 
"qpid.disttest.queue.creator.drainPollTime";
+
     void createQueues(Connection connection, Session session, 
List<QueueConfig> configs);
     void deleteQueues(Connection connection, Session session, 
List<QueueConfig> configs);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to