[
https://issues.apache.org/activemq/browse/AMQ-696?page=comments#action_36115 ]
Craig Day commented on AMQ-696:
-------------------------------
Some further information for you. I attached a debugger to the broker and the
testcase.
Here is the problem:
The testcase calls createSession() resulting in the following stack of calls:
wait():-1, java.lang.Object
wait():474, java.lang.Object
waitFor():267, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
get():117, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
getResult():44, org.apache.activemq.transport.FutureResponse
request():69, org.apache.activemq.transport.ResponseCorrelator
syncSendPacket():1108, org.apache.activemq.ActiveMQConnection
ensureConnectionInfoSent():1196, org.apache.activemq.ActiveMQConnection
createSession():252, org.apache.activemq.ActiveMQConnection
run():28, test.click.TestActiveMQ$1
run():595, java.lang.Thread
on the server this ends up at:
addConnection():148, org.apache.activemq.broker.region.RegionBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():69, org.apache.activemq.advisory.AdvisoryBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():77, org.apache.activemq.broker.MutableBrokerFilter
processAddConnection():500, org.apache.activemq.broker.AbstractConnection
processAddConnection():82,
org.apache.activemq.broker.jmx.ManagedTransportConnection
visit():106, org.apache.activemq.command.ConnectionInfo
service():196, org.apache.activemq.broker.AbstractConnection
onCommand():62, org.apache.activemq.broker.TransportConnection$1
onCommand():93, org.apache.activemq.transport.ResponseCorrelator
onCommand():70, org.apache.activemq.transport.TransportFilter
onCommand():114, org.apache.activemq.transport.WireFormatNegotiator
onCommand():122, org.apache.activemq.transport.InactivityMonitor
doConsume():87, org.apache.activemq.transport.TransportSupport
run():139, org.apache.activemq.transport.tcp.TcpTransport
run():595, java.lang.Thread
where RegionBroker adds the generated clientID to the clientIDSet.
Note that I havent called conn.start() yet!
The main thread then calls conn.start() resulting in the following stack of
calls:
wait():-1, java.lang.Object
wait():474, java.lang.Object
waitFor():267, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
get():117, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
getResult():44, org.apache.activemq.transport.FutureResponse
request():69, org.apache.activemq.transport.ResponseCorrelator
syncSendPacket():1108, org.apache.activemq.ActiveMQConnection
ensureConnectionInfoSent():1196, org.apache.activemq.ActiveMQConnection
start():412, org.apache.activemq.ActiveMQConnection
testConnectionFactory():38, test.click.TestActiveMQ
invoke0():-1, sun.reflect.NativeMethodAccessorImpl
invoke():39, sun.reflect.NativeMethodAccessorImpl
invoke():25, sun.reflect.DelegatingMethodAccessorImpl
invoke():585, java.lang.reflect.Method
runTest():154, junit.framework.TestCase
i.e. another call to ensureConnectionInfoSent()!!! resulting in the broker
stack of calls:
addConnection():148, org.apache.activemq.broker.region.RegionBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():69, org.apache.activemq.advisory.AdvisoryBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():77, org.apache.activemq.broker.MutableBrokerFilter
processAddConnection():500, org.apache.activemq.broker.AbstractConnection
processAddConnection():82,
org.apache.activemq.broker.jmx.ManagedTransportConnection
visit():106, org.apache.activemq.command.ConnectionInfo
service():196, org.apache.activemq.broker.AbstractConnection
onCommand():62, org.apache.activemq.broker.TransportConnection$1
onCommand():93, org.apache.activemq.transport.ResponseCorrelator
onCommand():70, org.apache.activemq.transport.TransportFilter
onCommand():114, org.apache.activemq.transport.WireFormatNegotiator
onCommand():122, org.apache.activemq.transport.InactivityMonitor
doConsume():87, org.apache.activemq.transport.TransportSupport
run():139, org.apache.activemq.transport.tcp.TcpTransport
run():595, java.lang.Thread
where the clientID is checked and collides with the clientID deposited from the
first call.
Its pretty obvious looking at it that ensureConnectionInfo() and the setting of
the boolean
isConnectionInfoSentToBroker is not thread safe:
/**
* Send the ConnectionInfo to the Broker
*
* @throws JMSException
*/
protected void ensureConnectionInfoSent() throws JMSException {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker || closed.get()) {
return;
}
if (info.getClientId() == null || info.getClientId().trim().length() ==
0) {
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info);
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new
SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
> Client: XXX already connected exception when connection started after
> consumers added
> -------------------------------------------------------------------------------------
>
> Key: AMQ-696
> URL: https://issues.apache.org/activemq/browse/AMQ-696
> Project: ActiveMQ
> Type: Bug
> Components: Broker
> Versions: 4.0 RC 2, 4.0 RC3
> Environment: WinXP
> Reporter: Craig Day
>
>
> While using the new Spring-2.0 DefaultMessageListenerContainer I can reliably
> reproduce the following exception on the broker side which usually results in
> a hang on the client side:
>
> The broker logs the following exception:
>
> INFO Service - Sync error occurred:
> javax.jms.InvalidClientIDException: Broker: localhost - Client:
> ID:inspiron-1410-114619274
> 7453-2:1 already connected
> javax.jms.InvalidClientIDException: Broker: localhost - Client:
> ID:inspiron-1410-1146192747453-2:1 already connected
> at
> org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
> at
> org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
> at
> org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:69)
> at
> org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
> at
> org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:77)
> at
> org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:500)
> at
> org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:82)
> at
> org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:106)
> at
> org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:196)
> at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
> at
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:93)
> at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:70)
> at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:114)
> at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
> at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
> at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:139)
> at java.lang.Thread.run(Thread.java:595)
>
> I have extrapolated the sequence of calls that
> DefaultMessageListenerContainer is making and managed to produce a simple
> test case that reproduces the problem:
>
> import junit.framework.TestCase;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.command.ActiveMQQueue;
>
> import javax.jms.*;
>
> public class TestActiveMQ extends TestCase {
>
> public void testConnectionFactory() throws Exception {
> final ActiveMQConnectionFactory cf = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
> final Connection conn = cf.createConnection();
>
> Runnable r = new Runnable() {
> public void run() {
> try {
> Session session = conn.createSession(false, 1);
> MessageConsumer consumer = session.createConsumer(queue,
> null);
> Message msg = consumer.receive(1000);
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
> };
> new Thread(r).start();
> conn.start();
>
> try {
> synchronized (this) {
> wait(3000);
> }
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> }
>
> Let us know if you need anymore information. Dont want to scrub ActiveMQ from
> my list of candidates If I can help it.
>
> cheers
> craig
>
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see:
http://www.atlassian.com/software/jira