Hi QPID developers,

I am new to this list and messaging in general, so forgive me if my question is obvious or has already been answered. [Is there a search-able archive of this list available somewhere?]

I have a Java program that relies on an AMQP broker for receiving (and sending) messages. I would like to write a Junit test for particular parts of this program. On my own computer I can of course use an external QPID broker but I would like these Junit tests to work on whatever computer they are being run on. I also don't want to depend on some remote QPID broker, as this may cause problems when behind a firewall etc.

To this end, I would like the ability to create a 'virtual' or 'in-memory' broker prior to running the actual test. I think that this should be possible with the QPID Java code. However, it doesn't quite work for me, as I cannot connect to the broker I supposedly created.

The code I wrote is in VirtualBroker.java. When I run it, the output is:

[Broker] BRK-1001 : Startup : Version: 0.8 Build: 1037942
[Broker] BRK-1001 : Startup : Version: 0.8 Build: 1037942
[main] ERROR manager.PrincipalDatabaseAuthenticationManager - Unable to load custom SASL providers. Qpid custom SASL authenticators unavailable. registry.getVirtualHostRegistry = org.apache.qpid.server.virtualhost.VirtualHostRegistry@163956
vhost.getName() = localhost
vhost.getName() = test
vhost.getName() = development
The port for the broker is 15672
The version is 0.8 [Build: 1037942]
Broker started ....
Sleeping 5 seconds
Trying to create a connection
Exception in thread "main" javax.jms.JMSException: Error creating connection: Connection refused
<stacktrace cut>

Thus, it looks like I can create the in-memory broker, but I cannot connect to it. Does a in-memory broker require a different kind of connection URL?

That being said, when I check the 'open ports' on my system, then 15672 (the one I specify in the configuration file) is not among those. Should it be?

If anyone could shed any light on what I am doing wrong that would be most appreciated.

Kind regards,

Stijn

PS. I attach the Java source code as well as the configuration files used.

        
package issrg.aipep.policyupdate;

import java.io.File;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.junit.Ignore;

@Ignore
public class VirtualBroker {
  
  private static final String BASE = "junit/issrg/aipep/policyupdate/";
  
  private final ConfigurationFileApplicationRegistry registry; 
  private final int port;
  
  public VirtualBroker(File configFile, int port) throws ConfigurationException {
    this.registry = new ConfigurationFileApplicationRegistry(configFile);
    
    
    this.port = port;
    try {
      ApplicationRegistry.initialise(registry); // do one with default ID as well, otherwise we get IllegalStateException
      ApplicationRegistry.initialise(registry, port);   
      registry.configure();
      
      
      System.out.println("registry.getVirtualHostRegistry = " + registry.getVirtualHostRegistry());
      for (VirtualHost vhost : registry.getVirtualHostRegistry().getVirtualHosts()) {
        System.out.println("vhost.getName() = " + vhost.getConfiguration().getName());
      }
    } catch (Exception e) {
      System.out.println("Problem with initialising registry!!!");
      e.printStackTrace();
      registry.close();
    }
   
    BrokerConfig config = ApplicationRegistry.getInstance(port).getBroker();
    System.out.println("The port for the broker is " + config.getPort());    
    System.out.println("The version is " + config.getVersion());
  }
  
  public void startBroker() throws AMQVMBrokerCreationException {
    TransportConnection.createVMBroker(port);  
  }
  
  public void stopBroker() {
    TransportConnection.killVMBroker(port);
    ApplicationRegistry.remove(port);
  }
  
  public static void main(String [] args) throws ConfigurationException, AMQVMBrokerCreationException, NamingException, JMSException {
    File file = new File(BASE + "qpid-config.xml");
    VirtualBroker broker = new VirtualBroker(file, 15672);
    
    broker.startBroker();
    System.out.println("Broker started .... ");
    
    System.out.println("Sleeping 5 seconds");
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
    java.util.Properties properties = new java.util.Properties();
    properties.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, 
        "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
    
    String connectionUrl = "amqp://guest:guest@clientid/test?brokerlist='localhost:15672'";
    // default connection on localhost
    properties.put("connectionfactory.local", connectionUrl);   
    javax.naming.Context context = new javax.naming.InitialContext(properties);
    
    TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup("local");
    
    System.out.println("Trying to create a connection");
        
    TopicConnection connection = factory.createTopicConnection();
    // XXX: double check that we have the right options for the session here
    TopicSession topicSession = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);    
    connection.start(); 
    
    System.out.println("Connection started");
    }
  

}
<?xml version="1.0" encoding="ISO-8859-1"?>
<!--
 -
 - Licensed to the Apache Software Foundation (ASF) under one
 - or more contributor license agreements.  See the NOTICE file
 - distributed with this work for additional information
 - regarding copyright ownership.  The ASF licenses this file
 - to you under the Apache License, Version 2.0 (the
 - "License"); you may not use this file except in compliance
 - with the License.  You may obtain a copy of the License at
 -
 -   http://www.apache.org/licenses/LICENSE-2.0
 -
 - Unless required by applicable law or agreed to in writing,
 - software distributed under the License is distributed on an
 - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 - KIND, either express or implied.  See the License for the
 - specific language governing permissions and limitations
 - under the License.
 -
 -->
<broker>
    <prefix>junit/issrg/aipep/policyupdate</prefix>
    <work>junit/issrg/aipep/policyupdate</work>
    <conf>junit/issrg/aipep/policyupdate/etc</conf>

    <plugin-directory>junit/issrg/aipep/policyupdate/lib/plugins</plugin-directory>
    <cache-directory>junit/issrg/aipep/policyupdate/cache</cache-directory>
   
    
    <connector>
        <!-- To enable SSL edit the keystorePath and keystorePassword
	     and set enabled to true. 
             To disasble Non-SSL port set sslOnly to true -->
        <ssl>
            <enabled>false</enabled>
            <sslOnly>false</sslOnly>
            <keystorePath>/path/to/keystore.ks</keystorePath>
            <keystorePassword>keystorepass</keystorePassword>
        </ssl>
        <qpidnio>false</qpidnio>
        <protectio>
            <enabled>false</enabled>
            <readBufferLimitSize>262144</readBufferLimitSize>
            <writeBufferLimitSize>262144</writeBufferLimitSize>	    
        </protectio>
        <transport>nio</transport>
        <port>15672</port>
        <sslport>18672</sslport>
        <socketReceiveBuffer>32768</socketReceiveBuffer>
        <socketSendBuffer>32768</socketSendBuffer>
    </connector>
    <management>
        <enabled>false</enabled>
        <jmxport>8999</jmxport>
        <ssl>
            <enabled>false</enabled>
            <!-- Update below path to your keystore location, or run the bin/create-example-ssl-stores(.sh|.bat)
                 script from within the etc/ folder to generate an example store with self-signed cert -->
            <keyStorePath>${conf}/qpid.keystore</keyStorePath>
            <keyStorePassword>password</keyStorePassword>
        </ssl>
    </management>
    <advanced>
        <filterchain enableExecutorPool="true"/>
        <enablePooledAllocator>false</enablePooledAllocator>
        <enableDirectBuffers>false</enableDirectBuffers>
        <framesize>65535</framesize>
        <compressBufferOnQueue>false</compressBufferOnQueue>
        <enableJMSXUserID>false</enableJMSXUserID>
        <locale>en_US</locale>	
    </advanced>

    <security>
        <principal-databases>
            <!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed -->
            <principal-database>
                <name>passwordfile</name>
                <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
                <attributes>
                    <attribute>
                        <name>passwordFile</name>
                        <value>${conf}/passwd</value>
                    </attribute>
                </attributes>
            </principal-database>
        </principal-databases>

        <allow-all />
        
        <msg-auth>false</msg-auth>
        
        <jmx>
            <access>${conf}/jmxremote.access</access>
            <principal-database>passwordfile</principal-database>
        </jmx>
    </security>

    
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>

    
    <heartbeat>
        <delay>0</delay>
        <timeoutFactor>2.0</timeoutFactor>
    </heartbeat>
    <queue>
        <auto_register>true</auto_register>
    </queue>

    <status-updates>ON</status-updates>

</broker>

<?xml version="1.0" encoding="ISO-8859-1"?>
<!--
 -
 - Licensed to the Apache Software Foundation (ASF) under one
 - or more contributor license agreements.  See the NOTICE file
 - distributed with this work for additional information
 - regarding copyright ownership.  The ASF licenses this file
 - to you under the Apache License, Version 2.0 (the
 - "License"); you may not use this file except in compliance
 - with the License.  You may obtain a copy of the License at
 -
 -   http://www.apache.org/licenses/LICENSE-2.0
 -
 - Unless required by applicable law or agreed to in writing,
 - software distributed under the License is distributed on an
 - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 - KIND, either express or implied.  See the License for the
 - specific language governing permissions and limitations
 - under the License.
 -
 -->
<virtualhosts>
    <default>test</default>
    <virtualhost>
        <name>localhost</name>
        <localhost>
            <store>
                <class>org.apache.qpid.server.store.MemoryMessageStore
                </class>
            </store>

            <housekeeping>
                <threadCount>2</threadCount>
                <expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
            </housekeeping>

            <exchanges>
                <exchange>
                    <type>direct</type>
                    <name>test.direct</name>
                    <durable>true</durable>
                </exchange>
                <exchange>
                    <type>topic</type>
                    <name>test.topic</name>
                </exchange>
            </exchanges>
            <queues>
                <exchange>amq.direct</exchange>
                <maximumQueueDepth>4235264</maximumQueueDepth>
                <!-- 4Mb -->
                <maximumMessageSize>2117632</maximumMessageSize>
                <!-- 2Mb -->
                <maximumMessageAge>600000</maximumMessageAge>
                <!-- 10 mins -->
                <maximumMessageCount>50</maximumMessageCount>
                <!-- 50 messages -->

                <queue>
                    <name>queue</name>
                </queue>
                <queue>
                    <name>ping</name>
                </queue>
                <queue>
                    <name>test-queue</name>
                    <test-queue>
                        <exchange>test.direct</exchange>
                        <durable>true</durable>
                    </test-queue>
                </queue>
                <queue>
                    <name>test-ping</name>
                    <test-ping>
                        <exchange>test.direct</exchange>
                    </test-ping>
                </queue>

            </queues>
        </localhost>
    </virtualhost>


    <virtualhost>
        <name>development</name>
        <development>
            <store>
                <class>org.apache.qpid.server.store.MemoryMessageStore
                </class>
            </store>

            <queues>
                <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
                <maximumMessageCount>50</maximumMessageCount>
                <queue>
                    <name>queue</name>
                    <queue>
                        <exchange>amq.direct</exchange>
                        <maximumQueueDepth>4235264</maximumQueueDepth>
                        <!-- 4Mb -->
                        <maximumMessageSize>2117632</maximumMessageSize>
                        <!-- 2Mb -->
                        <maximumMessageAge>600000</maximumMessageAge>
                        <!-- 10 mins -->
                    </queue>
                </queue>
                <queue>
                    <name>ping</name>
                    <ping>
                        <exchange>amq.direct</exchange>
                        <maximumQueueDepth>4235264</maximumQueueDepth>
                        <!-- 4Mb -->
                        <maximumMessageSize>2117632</maximumMessageSize>
                        <!-- 2Mb -->
                        <maximumMessageAge>600000</maximumMessageAge>
                        <!-- 10 mins -->
                    </ping>
                </queue>
            </queues>
        </development>
    </virtualhost>
    <virtualhost>
        <name>test</name>
        <test>
            <store>
                <class>org.apache.qpid.server.store.MemoryMessageStore
                </class>
            </store>

            <queues>
                <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
                <maximumMessageCount>50</maximumMessageCount>
                <queue>
                    <name>queue</name>
                    <queue>
                        <exchange>amq.direct</exchange>
                        <maximumQueueDepth>4235264</maximumQueueDepth>
                        <!-- 4Mb -->
                        <maximumMessageSize>2117632</maximumMessageSize>
                        <!-- 2Mb -->
                        <maximumMessageAge>600000</maximumMessageAge>
                        <!-- 10 mins -->
                    </queue>
                </queue>
                <queue>
                    <name>ping</name>
                    <ping>
                        <exchange>amq.direct</exchange>
                        <maximumQueueDepth>4235264</maximumQueueDepth>
                        <!-- 4Mb -->
                        <maximumMessageSize>2117632</maximumMessageSize>
                        <!-- 2Mb -->
                        <maximumMessageAge>600000</maximumMessageAge>
                        <!-- 10 mins -->
                    </ping>
                </queue>
            </queues>
        </test>
    </virtualhost>
</virtualhosts>

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#Generated by JMX Console : Last edited by user:admin
#Tue Jun 12 16:46:39 BST 2007
admin=admin
guest=readonly
user=readwrite
guest:guest
client:guest
server:guest
admin:admin

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to