When trying to start multiple producers from multiple threads on single
queue, using JmsTemplate with PooledConnectionFactory, I get couple of
following exceptions, and producer threads get stuck in a wait :
[INFO] Service - Sync error occurred: javax.jms.InvalidClientIDException:
Broker: localhost - Client: ID:kiss-1890-1147183464931-3:0 already connected
<javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already
connected>javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already connected
at
org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
at
org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:67)
at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
at
org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:77)
at
org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:500)
at
org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:106)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:196)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:88)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:70)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:75)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:63)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:68)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1108)
at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1196)
at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:252)
at
org.apache.activemq.pool.SessionPool.createSession(SessionPool.java:116)
at org.apache.activemq.pool.SessionPool.makeObject(SessionPool.java:84)
at
org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:771)
at
org.apache.activemq.pool.SessionPool.borrowSession(SessionPool.java:59)
at
org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:67)
at
org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:123)
at
org.springframework.jms.core.JmsTemplate.createSession(JmsTemplate.java:408)
at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:545)
at ProducerTest$ProducerRunnable.run(ProducerTest.java:83)
After that, some of the threads continue sending messages, while others are
stuck in a wait:
[EMAIL PROTECTED], priority=5, in group 'main', status: 'WAIT'
wait():-1, Object.java
wait():429, Object.java
waitFor():267, FutureTask.java
get():117, FutureTask.java
getResult():44, FutureResponse.java
request():69, ResponseCorrelator.java
syncSendPacket():1108, ActiveMQConnection.java
ensureConnectionInfoSent():1196, ActiveMQConnection.java
createSession():252, ActiveMQConnection.java
createSession():116, SessionPool.java
makeObject():84, SessionPool.java
borrowObject():771, GenericObjectPool.java
borrowSession():59, SessionPool.java
createSession():67, ConnectionPool.java
createSession():123, PooledConnection.java
createSession():408, JmsTemplate.java
execute():496, JmsTemplate.java
send():545, JmsTemplate.java
Test code:
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import junit.framework.TestCase;
import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
public class ProducerTest extends TestCase {
public void testMultipleProducers() throws Exception {
BrokerService broker = new BrokerService();
broker.setUseLoggingForShutdownErrors(true);
broker.setUseShutdownHook(true);
broker.setBrokerName("localhost");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setTransportConnectorURIs(new String[] {"vm://localhost"});
broker.start();
ActiveMQConnectionFactory jmsFactory = new
ActiveMQConnectionFactory();
jmsFactory.setBrokerURL("vm://localhost");
PooledConnectionFactory pooledConnectionFactory = new
PooledConnectionFactory(jmsFactory);
JmsTemplate sendTemplate = new JmsTemplate();
sendTemplate.setPubSubDomain(false);
sendTemplate.setConnectionFactory(pooledConnectionFactory);
Destination testDestination = new ActiveMQQueue("TEST_QUEUE");
int producerCount = 10;
ProducerRunnable[] producerRunnables = new
ProducerRunnable[producerCount];
for (int i=0; i<producerCount; i++) {
producerRunnables[i] = new ProducerRunnable(sendTemplate,
testDestination, "Producer_" + i);
}
for (int i=0; i<producerCount; i++) {
new Thread(producerRunnables[i]).start();
}
System.out.println("Started all producers");
synchronized (this) {
wait(60000);
}
broker.stop();
}
private static class ProducerRunnable implements Runnable {
private JmsTemplate template;
private Destination destination;
private String name;
public ProducerRunnable(JmsTemplate template, Destination
destination, String name) {
this.template = template;
this.destination = destination;
this.name = name;
}
public void run() {
final String messagePrefix = "test";
System.out.println("Starting producer: " + name);
for (int i=0; i<1000; i++) {
final String message = messagePrefix + i;
template.send(destination, new MessageCreator() {
public javax.jms.Message createMessage(Session session)
throws JMSException {
ObjectMessage objMessage =
session.createObjectMessage();
objMessage.setObject(message);
return objMessage;
}
});
System.out.println(name + " sent: " + message);
}
System.out.println("Producer " + name + " finished");
}
}
}
I guess it's some sort of a race condition. If I add some initial delay when
starting up those threads (before sending the first message), then all of
them get a connection and continue sending messages.
I've tried 4.0-RC2 and 4.0R-C3. Same thing happens with tcp
connector/transport.
TIA
--
View this message in context:
http://www.nabble.com/multiple-producers-and-InvalidClientIDException-problem%2C-producers-wait-t1584904.html#a4300897
Sent from the ActiveMQ - User forum at Nabble.com.