Author: kwall
Date: Mon Dec 21 12:52:48 2015
New Revision: 1721146
URL: http://svn.apache.org/viewvc?rev=1721146&view=rev
Log:
NO-JIRA: Allow perftests to be run automatically against the qpid-jms-client
(AMQP 1.0)
* Added a simple QpidRestAPIQueueCreator that creates/deletes queues using the
REST API
Added:
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidRestAPIQueueCreator.java
Modified:
qpid/java/trunk/joramtests/pom.xml
qpid/java/trunk/perftests/etc/perftests-jndi-qpid-jms-client.properties
qpid/java/trunk/perftests/pom.xml
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
qpid/java/trunk/pom.xml
Modified: qpid/java/trunk/joramtests/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/joramtests/pom.xml?rev=1721146&r1=1721145&r2=1721146&view=diff
==============================================================================
--- qpid/java/trunk/joramtests/pom.xml (original)
+++ qpid/java/trunk/joramtests/pom.xml Mon Dec 21 12:52:48 2015
@@ -34,7 +34,6 @@
<properties>
<joram-jms-tests-version>1.0</joram-jms-tests-version>
- <httpclient-version>4.4</httpclient-version>
<qpid-amqp-1-0-client-jms-version>0.32</qpid-amqp-1-0-client-jms-version>
<qpid-jms-client-version>0.6.0</qpid-jms-client-version>
Modified:
qpid/java/trunk/perftests/etc/perftests-jndi-qpid-jms-client.properties
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/perftests/etc/perftests-jndi-qpid-jms-client.properties?rev=1721146&r1=1721145&r2=1721146&view=diff
==============================================================================
--- qpid/java/trunk/perftests/etc/perftests-jndi-qpid-jms-client.properties
(original)
+++ qpid/java/trunk/perftests/etc/perftests-jndi-qpid-jms-client.properties Mon
Dec 21 12:52:48 2015
@@ -19,17 +19,13 @@
#
#
-brokerHostPortPlain=localhost:5672
-brokerHostPortSsl=localhost:5671
-perftestResultsDirectory=perftestResultsDb
-
-# jms.receiveLocalOnly=true works around QPIDJMS-139
+# TODO jms.receiveLocalOnly=true works around QPIDJMS-139
connectionfactory.connectionfactory =
amqp://localhost:5672/?jms.username=guest&jms.password=guest&jms.receiveLocalOnly=true
-#connectionfactory.sslconnectionfactory =
amqp://${brokerHostPortSsl}/?jms.username=guest&jms.password=guest
+connectionfactory.sslconnectionfactory =
amqps://localhost:5671/?jms.username=guest&jms.password=guest&jms.receiveLocalOnly=true
queue.controllerqueue = controllerqueue
jdbcDriverClass=org.apache.derby.jdbc.EmbeddedDriver
# writes to a results database in ./perftestResultsDb by default.
-jdbcUrl=jdbc:derby:${perftestResultsDirectory};create=true
+jdbcUrl=jdbc:derby:perftestResultsDirectory;create=true
Modified: qpid/java/trunk/perftests/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/perftests/pom.xml?rev=1721146&r1=1721145&r2=1721146&view=diff
==============================================================================
--- qpid/java/trunk/perftests/pom.xml (original)
+++ qpid/java/trunk/perftests/pom.xml Mon Dec 21 12:52:48 2015
@@ -100,6 +100,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient-version}</version>
+ </dependency>
+
<!-- test deps -->
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -232,7 +238,6 @@
</plugins>
</build>
<profiles>
-
<profile>
<id>qpid-jms-client</id>
<activation>
@@ -284,12 +289,11 @@
</systemProperty>
<systemProperty>
<key>qpid.disttest.queue.creator.class</key>
-
<value>org.apache.qpid.disttest.jms.ExistingQueueDrainer</value>
+
<value>org.apache.qpid.disttest.jms.QpidRestAPIQueueCreator</value>
</systemProperty>
</systemProperties>
</configuration>
</plugin>
-
</plugins>
</build>
<dependencies>
Modified:
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java?rev=1721146&r1=1721145&r2=1721146&view=diff
==============================================================================
---
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
(original)
+++
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
Mon Dec 21 12:52:48 2015
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.disttest.jms;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -53,7 +55,7 @@ public class ControllerJmsDelegate
private final Map<String, Destination> _clientNameToQueueMap = new
ConcurrentHashMap<String, Destination>();
private final Connection _connection;
- private final Destination _controllerQueue;
+ private final Queue _controllerQueue;
private final Session _controllerQueueListenerSession;
private final Session _commandSession;
private QueueCreator _queueCreator;
@@ -65,7 +67,7 @@ public class ControllerJmsDelegate
final ConnectionFactory connectionFactory = (ConnectionFactory)
context.lookup("connectionfactory");
_connection = connectionFactory.createConnection();
_connection.start();
- _controllerQueue = (Destination) context.lookup("controllerqueue");
+ _controllerQueue = (Queue) context.lookup("controllerqueue");
_controllerQueueListenerSession = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
_commandSession = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -107,6 +109,7 @@ public class ControllerJmsDelegate
{
try
{
+ createControllerQueue();
final MessageConsumer consumer =
_controllerQueueListenerSession.createConsumer(_controllerQueue);
consumer.setMessageListener(new MessageListener()
{
@@ -116,13 +119,13 @@ public class ControllerJmsDelegate
try
{
String jmsMessageID = message.getJMSMessageID();
- LOGGER.debug("Received message " + jmsMessageID);
+ LOGGER.debug("Received message ID {}", jmsMessageID);
final Command command =
JmsMessageAdaptor.messageToCommand(message);
- LOGGER.debug("Converted message " + jmsMessageID + "
into command: " + command);
+ LOGGER.debug("Converted message ID {} into command
{}", jmsMessageID, command);
processCommandWithFirstSupportingListener(command);
- LOGGER.debug("Finished processing command for message
" + jmsMessageID);
+ LOGGER.debug("Finished processing command for message
ID", jmsMessageID);
}
catch (Exception t)
{
@@ -268,6 +271,16 @@ public class ControllerJmsDelegate
_queueCreator.deleteQueues(_connection, _commandSession, queues);
}
+ private void createControllerQueue() throws JMSException
+ {
+ QueueConfig controllerQueueConfig = new
QueueConfig(_controllerQueue.getQueueName(),
+ true,
+
Collections.<String, Object>emptyMap());
+ _queueCreator.createQueues(_connection,
+ _controllerQueueListenerSession,
+
Collections.singletonList(controllerQueueConfig));
+ }
+
public void addCommandListener(CommandListener commandListener)
{
_commandListeners.add(commandListener);
Added:
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidRestAPIQueueCreator.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidRestAPIQueueCreator.java?rev=1721146&view=auto
==============================================================================
---
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidRestAPIQueueCreator.java
(added)
+++
qpid/java/trunk/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidRestAPIQueueCreator.java
Mon Dec 21 12:52:48 2015
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.HttpClients;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+
+/**
+ * Assumes Basic-Auth is enabled
+ */
+public class QpidRestAPIQueueCreator implements QueueCreator
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QpidRestAPIQueueCreator.class);
+ private static int _drainPollTimeout =
Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
+ private final HttpHost _management;
+ private final String _managementUser;
+ private final String _managementPassword;
+ private final String _virtualhostnode;
+ private final String _virtualhost;
+ private final String _queueApiUrl;
+
+ public QpidRestAPIQueueCreator()
+ {
+ _managementUser = System.getProperty("perftests.manangement-user",
"guest");
+ _managementPassword =
System.getProperty("perftests.manangement-password", "guest");
+
+ _virtualhostnode =
System.getProperty("perftests.broker-virtualhostnode", "default");
+ _virtualhost = System.getProperty("perftests.broker-virtualhost",
"default");
+
+ _management =
HttpHost.create(System.getProperty("perftests.manangement-url",
"http://localhost:8080"));
+ _queueApiUrl = System.getProperty("perftests.manangement-api-queue",
"/api/latest/queue/%s/%s/%s");
+ }
+
+ @Override
+ public void createQueues(Connection connection, Session session,
List<QueueConfig> configs)
+ {
+ HttpClientContext context = HttpClientContext.create();
+
+ for (QueueConfig queueConfig : configs)
+ {
+ final String queueName = queueConfig.getName();
+ managementCreateQueue(queueName, context);
+ }
+ }
+
+ @Override
+ public void deleteQueues(Connection connection, Session session,
List<QueueConfig> configs)
+ {
+ HttpClientContext context = HttpClientContext.create();
+
+ for (QueueConfig queueConfig : configs)
+ {
+ final String queueName = queueConfig.getName();
+ drainQueue(connection, queueName);
+ managementDeleteQueue(queueName, context);
+ }
+ }
+
+ private void drainQueue(Connection connection, String queueName)
+ {
+ try
+ {
+ int counter = 0;
+ while (queueContainsMessages(connection, queueName))
+ {
+ if (counter == 0)
+ {
+ LOGGER.debug("Draining queue {}", queueName);
+ }
+ counter += drain(connection, queueName);
+ }
+ if (counter > 0)
+ {
+ LOGGER.info("Drained {} message(s) from queue {} ", counter,
queueName);
+ }
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to drain queue " +
queueName, e);
+ }
+ }
+
+ private int drain(Connection connection, String queueName) throws
JMSException
+ {
+ int counter = 0;
+ Session session = null;
+ try
+ {
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer =
session.createConsumer(session.createQueue(queueName));
+ try
+ {
+ while (messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+ }
+ finally
+ {
+ messageConsumer.close();
+ }
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ return counter;
+ }
+
+ private boolean queueContainsMessages(Connection connection, String
queueName) throws JMSException
+ {
+ Session session = null;
+ try
+ {
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ QueueBrowser browser = null;
+ try
+ {
+ browser =
session.createBrowser(session.createQueue(queueName));
+ return browser.getEnumeration().hasMoreElements();
+ }
+ finally
+ {
+ if (browser != null)
+ {
+ browser.close();
+ }
+ }
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+
+ private void managementCreateQueue(final String name, final
HttpClientContext context)
+ {
+ HttpPut put = new HttpPut(String.format(_queueApiUrl,
_virtualhostnode, _virtualhost, name));
+
+ StringEntity input = createStringEntity("{}");
+ input.setContentType("application/json");
+ put.setEntity(input);
+
+ executeManagement(put, context);
+ }
+
+ private void managementDeleteQueue(final String name, final
HttpClientContext context)
+ {
+ HttpDelete delete = new HttpDelete(String.format(_queueApiUrl,
_virtualhostnode, _virtualhost, name));
+ executeManagement(delete, context);
+ }
+
+ private StringEntity createStringEntity(final String string)
+ {
+ try
+ {
+ return new StringEntity(string);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void executeManagement(final HttpRequest httpRequest, final
HttpClientContext context)
+ {
+ try
+ {
+ UsernamePasswordCredentials credentials = new
UsernamePasswordCredentials(_managementUser, _managementPassword);
+
+ final HttpClient httpClient = HttpClients.createDefault();
+
+ httpRequest.addHeader(new BasicScheme().authenticate(credentials,
httpRequest));
+ final HttpResponse response = httpClient.execute(_management,
httpRequest, context);
+
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200 && statusCode != 201)
+ {
+ throw new RuntimeException(String.format("Failed : HTTP error
code : %d status line : %s", statusCode,
+
response.getStatusLine()));
+ }
+
+ }
+ catch (IOException | org.apache.http.auth.AuthenticationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
Modified: qpid/java/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1721146&r1=1721145&r2=1721146&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Mon Dec 21 12:52:48 2015
@@ -145,6 +145,7 @@
<junit-version>4.11</junit-version>
<mockito-version>1.9.5</mockito-version>
<hamcrest-version>1.3</hamcrest-version>
+ <httpclient-version>4.4</httpclient-version>
<exec-maven-plugin-version>1.3.2</exec-maven-plugin-version>
<javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]