bridge transport is stuck on socket write at both ends
------------------------------------------------------
Key: AMQ-3591
URL: https://issues.apache.org/jira/browse/AMQ-3591
Project: ActiveMQ
Issue Type: Bug
Components: Transport
Affects Versions: 5.5.0
Environment: 2 redhat linux
Reporter: Leon Fleysher
We have embedded (into Jboss 7 web app) broker A on one linux connecting
through tcp duplex=true bridge to another standalone broker B. Both brokers
exchange messages through 4 common queues. Each broker has its own persistence
storage in mssql db.
After a few hours of flawless work the bridge stops delivering the messages in
both directions.
broker A offending thread stack trace (causing the rest to wait on oneway
mutex):
BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int,
int)
java.io.DataOutputStream.write(byte[], int, int)
org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object,
ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport,
TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()
broker B offending thread stack trace (causing the rest to wait on oneway
mutex):
ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE]
CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
java.io.DataOutputStream.flush()
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object,
ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport,
TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.DedicatedTaskRunner.runTask()
org.apache.activemq.thread.DedicatedTaskRunner$1.run()
Both brokers run on a local 1 Gbit network. This scenario happens always after
few hours of work and is recreated consistently.
Broker A relevant configuration:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"
>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="as2analyticsEmbedded" persistent="true"
dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="false"
memoryLimit="1gb">
<pendingQueuePolicy>
<fileQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<networkConnectors>
<networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
name="bridge"
duplex="true"
conduitSubscriptions="true"
decreaseNetworkConsumerPriority="false">
</networkConnector>
</networkConnectors>
<!-- both FS and DB must be provided. AMQ will use DB for long-term
storage.
FS contents that has not been consumed yet is stored to DB from time to
time -->
<persistenceAdapter>
<jdbcPersistenceAdapter
dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded"
dataSource="#mssql-ds" useDatabaseLock="false">
<adapter>
<imageBasedJDBCAdaptor/>
</adapter>
</jdbcPersistenceAdapter>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="2gb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="80gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="10gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="as2analyticsEmbeddedConnector"
uri="vm://as2analyticsEmbedded"/>
</transportConnectors>
</broker>
<bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName"
value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<property name="url"
value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
<property name="username" value="obfuscated"/>
<property name="password" value="obfuscated"/>
<property name="initialSize" value="1"/>
<property name="maxActive" value="100"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
Broker B relevant configuration:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="analyticsCentral" dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<!--
For better performances use VM cursor and small memory
limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer
flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="false"
memoryLimit="1gb">
<pendingQueuePolicy>
<fileQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed
in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default
persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data"
dataSource="#mssql-ds" useDatabaseLock="false">
<adapter>
<imageBasedJDBCAdaptor/>
</adapter>
</jdbcPersistenceAdapter>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="2gb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="10gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="5gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
<amq:redeliveryPolicy>
<amq:redeliveryPolicy maximumRedeliveries="0"/>
</amq:redeliveryPolicy>
</amq:connectionFactory>
<bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName"
value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<property name="url"
value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
<property name="username" value="obfuscated"/>
<property name="password" value="obfuscated"/>
<property name="initialSize" value="1"/>
<property name="maxActive" value="100"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<!--
Enable web consoles, REST and Ajax APIs and demos
It also includes Camel (with its web console), see
${ACTIVEMQ_HOME}/conf/camel.xml for more info
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira