Hi QPID developers,
I am new to this list and messaging in general, so forgive me if my
question is obvious or has already been answered. [Is there a
search-able archive of this list available somewhere?]
I have a Java program that relies on an AMQP broker for receiving (and
sending) messages. I would like to write a Junit test for particular
parts of this program. On my own computer I can of course use an
external QPID broker but I would like these Junit tests to work on
whatever computer they are being run on. I also don't want to depend on
some remote QPID broker, as this may cause problems when behind a
firewall etc.
To this end, I would like the ability to create a 'virtual' or
'in-memory' broker prior to running the actual test. I think that this
should be possible with the QPID Java code. However, it doesn't quite
work for me, as I cannot connect to the broker I supposedly created.
The code I wrote is in VirtualBroker.java. When I run it, the output is:
[Broker] BRK-1001 : Startup : Version: 0.8 Build: 1037942
[Broker] BRK-1001 : Startup : Version: 0.8 Build: 1037942
[main] ERROR manager.PrincipalDatabaseAuthenticationManager - Unable to
load custom SASL providers. Qpid custom SASL authenticators unavailable.
registry.getVirtualHostRegistry =
org.apache.qpid.server.virtualhost.VirtualHostRegistry@163956
vhost.getName() = localhost
vhost.getName() = test
vhost.getName() = development
The port for the broker is 15672
The version is 0.8 [Build: 1037942]
Broker started ....
Sleeping 5 seconds
Trying to create a connection
Exception in thread "main" javax.jms.JMSException: Error creating
connection: Connection refused
<stacktrace cut>
Thus, it looks like I can create the in-memory broker, but I cannot
connect to it. Does a in-memory broker require a different kind of
connection URL?
That being said, when I check the 'open ports' on my system, then 15672
(the one I specify in the configuration file) is not among those. Should
it be?
If anyone could shed any light on what I am doing wrong that would be
most appreciated.
Kind regards,
Stijn
PS. I attach the Java source code as well as the configuration files used.
package issrg.aipep.policyupdate;
import java.io.File;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.junit.Ignore;
@Ignore
public class VirtualBroker {
private static final String BASE = "junit/issrg/aipep/policyupdate/";
private final ConfigurationFileApplicationRegistry registry;
private final int port;
public VirtualBroker(File configFile, int port) throws ConfigurationException {
this.registry = new ConfigurationFileApplicationRegistry(configFile);
this.port = port;
try {
ApplicationRegistry.initialise(registry); // do one with default ID as well, otherwise we get IllegalStateException
ApplicationRegistry.initialise(registry, port);
registry.configure();
System.out.println("registry.getVirtualHostRegistry = " + registry.getVirtualHostRegistry());
for (VirtualHost vhost : registry.getVirtualHostRegistry().getVirtualHosts()) {
System.out.println("vhost.getName() = " + vhost.getConfiguration().getName());
}
} catch (Exception e) {
System.out.println("Problem with initialising registry!!!");
e.printStackTrace();
registry.close();
}
BrokerConfig config = ApplicationRegistry.getInstance(port).getBroker();
System.out.println("The port for the broker is " + config.getPort());
System.out.println("The version is " + config.getVersion());
}
public void startBroker() throws AMQVMBrokerCreationException {
TransportConnection.createVMBroker(port);
}
public void stopBroker() {
TransportConnection.killVMBroker(port);
ApplicationRegistry.remove(port);
}
public static void main(String [] args) throws ConfigurationException, AMQVMBrokerCreationException, NamingException, JMSException {
File file = new File(BASE + "qpid-config.xml");
VirtualBroker broker = new VirtualBroker(file, 15672);
broker.startBroker();
System.out.println("Broker started .... ");
System.out.println("Sleeping 5 seconds");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
java.util.Properties properties = new java.util.Properties();
properties.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
String connectionUrl = "amqp://guest:guest@clientid/test?brokerlist='localhost:15672'";
// default connection on localhost
properties.put("connectionfactory.local", connectionUrl);
javax.naming.Context context = new javax.naming.InitialContext(properties);
TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup("local");
System.out.println("Trying to create a connection");
TopicConnection connection = factory.createTopicConnection();
// XXX: double check that we have the right options for the session here
TopicSession topicSession = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
System.out.println("Connection started");
}
}
<?xml version="1.0" encoding="ISO-8859-1"?>
<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
-->
<broker>
<prefix>junit/issrg/aipep/policyupdate</prefix>
<work>junit/issrg/aipep/policyupdate</work>
<conf>junit/issrg/aipep/policyupdate/etc</conf>
<plugin-directory>junit/issrg/aipep/policyupdate/lib/plugins</plugin-directory>
<cache-directory>junit/issrg/aipep/policyupdate/cache</cache-directory>
<connector>
<!-- To enable SSL edit the keystorePath and keystorePassword
and set enabled to true.
To disasble Non-SSL port set sslOnly to true -->
<ssl>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
<qpidnio>false</qpidnio>
<protectio>
<enabled>false</enabled>
<readBufferLimitSize>262144</readBufferLimitSize>
<writeBufferLimitSize>262144</writeBufferLimitSize>
</protectio>
<transport>nio</transport>
<port>15672</port>
<sslport>18672</sslport>
<socketReceiveBuffer>32768</socketReceiveBuffer>
<socketSendBuffer>32768</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
<jmxport>8999</jmxport>
<ssl>
<enabled>false</enabled>
<!-- Update below path to your keystore location, or run the bin/create-example-ssl-stores(.sh|.bat)
script from within the etc/ folder to generate an example store with self-signed cert -->
<keyStorePath>${conf}/qpid.keystore</keyStorePath>
<keyStorePassword>password</keyStorePassword>
</ssl>
</management>
<advanced>
<filterchain enableExecutorPool="true"/>
<enablePooledAllocator>false</enablePooledAllocator>
<enableDirectBuffers>false</enableDirectBuffers>
<framesize>65535</framesize>
<compressBufferOnQueue>false</compressBufferOnQueue>
<enableJMSXUserID>false</enableJMSXUserID>
<locale>en_US</locale>
</advanced>
<security>
<principal-databases>
<!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed -->
<principal-database>
<name>passwordfile</name>
<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
<attributes>
<attribute>
<name>passwordFile</name>
<value>${conf}/passwd</value>
</attribute>
</attributes>
</principal-database>
</principal-databases>
<allow-all />
<msg-auth>false</msg-auth>
<jmx>
<access>${conf}/jmxremote.access</access>
<principal-database>passwordfile</principal-database>
</jmx>
</security>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
<heartbeat>
<delay>0</delay>
<timeoutFactor>2.0</timeoutFactor>
</heartbeat>
<queue>
<auto_register>true</auto_register>
</queue>
<status-updates>ON</status-updates>
</broker>
<?xml version="1.0" encoding="ISO-8859-1"?>
<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
-->
<virtualhosts>
<default>test</default>
<virtualhost>
<name>localhost</name>
<localhost>
<store>
<class>org.apache.qpid.server.store.MemoryMessageStore
</class>
</store>
<housekeeping>
<threadCount>2</threadCount>
<expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
</housekeeping>
<exchanges>
<exchange>
<type>direct</type>
<name>test.direct</name>
<durable>true</durable>
</exchange>
<exchange>
<type>topic</type>
<name>test.topic</name>
</exchange>
</exchanges>
<queues>
<exchange>amq.direct</exchange>
<maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
<maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
<maximumMessageCount>50</maximumMessageCount>
<!-- 50 messages -->
<queue>
<name>queue</name>
</queue>
<queue>
<name>ping</name>
</queue>
<queue>
<name>test-queue</name>
<test-queue>
<exchange>test.direct</exchange>
<durable>true</durable>
</test-queue>
</queue>
<queue>
<name>test-ping</name>
<test-ping>
<exchange>test.direct</exchange>
</test-ping>
</queue>
</queues>
</localhost>
</virtualhost>
<virtualhost>
<name>development</name>
<development>
<store>
<class>org.apache.qpid.server.store.MemoryMessageStore
</class>
</store>
<queues>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>50</maximumMessageCount>
<queue>
<name>queue</name>
<queue>
<exchange>amq.direct</exchange>
<maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
<maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
</queue>
</queue>
<queue>
<name>ping</name>
<ping>
<exchange>amq.direct</exchange>
<maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
<maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
</ping>
</queue>
</queues>
</development>
</virtualhost>
<virtualhost>
<name>test</name>
<test>
<store>
<class>org.apache.qpid.server.store.MemoryMessageStore
</class>
</store>
<queues>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>50</maximumMessageCount>
<queue>
<name>queue</name>
<queue>
<exchange>amq.direct</exchange>
<maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
<maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
</queue>
</queue>
<queue>
<name>ping</name>
<ping>
<exchange>amq.direct</exchange>
<maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
<maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
</ping>
</queue>
</queues>
</test>
</virtualhost>
</virtualhosts>
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#Generated by JMX Console : Last edited by user:admin
#Tue Jun 12 16:46:39 BST 2007
admin=admin
guest=readonly
user=readwrite
guest:guest
client:guest
server:guest
admin:admin
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]