When trying to start multiple producers from multiple threads on single
queue, using JmsTemplate with PooledConnectionFactory, I get couple of
following exceptions, and producer threads get stuck in a wait :

[INFO] Service - Sync error occurred: javax.jms.InvalidClientIDException:
Broker: localhost - Client: ID:kiss-1890-1147183464931-3:0 already connected
<javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already
connected>javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already connected
        at
org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
        at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
        at
org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:67)
        at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
        at
org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:77)
        at
org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:500)
        at
org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:106)
        at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:196)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:88)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:70)
        at 
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:75)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
        at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:63)
        at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:68)
        at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1108)
        at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1196)
        at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:252)
        at 
org.apache.activemq.pool.SessionPool.createSession(SessionPool.java:116)
        at org.apache.activemq.pool.SessionPool.makeObject(SessionPool.java:84)
        at
org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:771)
        at 
org.apache.activemq.pool.SessionPool.borrowSession(SessionPool.java:59)
        at
org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:67)
        at
org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:123)
        at
org.springframework.jms.core.JmsTemplate.createSession(JmsTemplate.java:408)
        at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
        at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:545)
        at ProducerTest$ProducerRunnable.run(ProducerTest.java:83)

After that, some of the threads continue sending messages, while others are
stuck in a wait: 
[EMAIL PROTECTED], priority=5, in group 'main', status: 'WAIT'
          wait():-1, Object.java
          wait():429, Object.java
          waitFor():267, FutureTask.java
          get():117, FutureTask.java
          getResult():44, FutureResponse.java
          request():69, ResponseCorrelator.java
          syncSendPacket():1108, ActiveMQConnection.java
          ensureConnectionInfoSent():1196, ActiveMQConnection.java
          createSession():252, ActiveMQConnection.java
          createSession():116, SessionPool.java
          makeObject():84, SessionPool.java
          borrowObject():771, GenericObjectPool.java
          borrowSession():59, SessionPool.java
          createSession():67, ConnectionPool.java
          createSession():123, PooledConnection.java
          createSession():408, JmsTemplate.java
          execute():496, JmsTemplate.java
          send():545, JmsTemplate.java

Test code:

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;

import junit.framework.TestCase;

import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;

public class ProducerTest extends TestCase {

    public void testMultipleProducers() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseLoggingForShutdownErrors(true);
        broker.setUseShutdownHook(true);
        broker.setBrokerName("localhost");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setTransportConnectorURIs(new String[] {"vm://localhost"});
        broker.start();

        ActiveMQConnectionFactory jmsFactory = new
ActiveMQConnectionFactory();
        jmsFactory.setBrokerURL("vm://localhost");

        PooledConnectionFactory pooledConnectionFactory = new
PooledConnectionFactory(jmsFactory);

        JmsTemplate sendTemplate = new JmsTemplate();
        sendTemplate.setPubSubDomain(false);
        sendTemplate.setConnectionFactory(pooledConnectionFactory);

        Destination testDestination = new ActiveMQQueue("TEST_QUEUE");

        int producerCount = 10;
        ProducerRunnable[] producerRunnables = new
ProducerRunnable[producerCount];

        for (int i=0; i<producerCount; i++) {
            producerRunnables[i] = new ProducerRunnable(sendTemplate,
testDestination, "Producer_" + i);
        }

        for (int i=0; i<producerCount; i++) {
            new Thread(producerRunnables[i]).start();
        }

        System.out.println("Started all producers");

        synchronized (this) {
            wait(60000);
        }

        broker.stop();
    }

    private static class ProducerRunnable implements Runnable {
        private JmsTemplate template;
        private Destination destination;
        private String name;

        public ProducerRunnable(JmsTemplate template, Destination
destination, String name) {
            this.template = template;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            final String messagePrefix = "test";

            System.out.println("Starting producer: " + name);

            for (int i=0; i<1000; i++) {
                final String message = messagePrefix + i;

                template.send(destination, new MessageCreator() {
                    public javax.jms.Message createMessage(Session session)
throws JMSException {
                        ObjectMessage objMessage =
session.createObjectMessage();
                        objMessage.setObject(message);
                        return objMessage;
                    }
                });

                System.out.println(name + " sent: " + message);
            }
            System.out.println("Producer " + name + " finished");
        }
    }
}

I guess it's some sort of a race condition. If I add some initial delay when
starting up those threads (before sending the first message), then all of
them get a connection and continue sending messages.

I've tried 4.0-RC2 and 4.0R-C3. Same thing happens with tcp
connector/transport.

TIA
--
View this message in context: 
http://www.nabble.com/multiple-producers-and-InvalidClientIDException-problem%2C-producers-wait-t1584904.html#a4300897
Sent from the ActiveMQ - User forum at Nabble.com.

Reply via email to