bridge transport is stuck on socket write at both ends
------------------------------------------------------

                 Key: AMQ-3591
                 URL: https://issues.apache.org/jira/browse/AMQ-3591
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 5.5.0
         Environment: 2 redhat linux
            Reporter: Leon Fleysher


We have embedded (into Jboss 7 web app) broker A on one linux connecting 
through tcp duplex=true bridge to another standalone broker B. Both brokers 
exchange messages through 4 common queues. Each broker has its own persistence 
storage in mssql db.

After a few hours of flawless work the bridge stops delivering the messages in 
both directions.

broker A offending thread stack trace (causing the rest to wait on oneway 
mutex):
BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, 
int)
java.io.DataOutputStream.write(byte[], int, int)
org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
 DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat, 
Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat,
 Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
 Object, DataOutput, BooleanStream)
org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, 
ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, 
TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.PooledTaskRunner.runTask()
org.apache.activemq.thread.PooledTaskRunner$1.run()
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
java.util.concurrent.ThreadPoolExecutor$Worker.run()
java.lang.Thread.run()

broker B offending thread stack trace (causing the rest to wait on oneway 
mutex):

ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] 
CPU time: 0:00
java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
java.net.SocketOutputStream.socketWrite(byte[], int, int)
java.net.SocketOutputStream.write(byte[], int, int)
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
java.io.DataOutputStream.flush()
org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
org.apache.activemq.transport.InactivityMonitor.oneway(Object)
org.apache.activemq.transport.TransportFilter.oneway(Object)
org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, 
ResponseCallback)
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
org.apache.activemq.transport.TransportFilter.onCommand(Object)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, 
TransportListener, Object)
org.apache.activemq.transport.vm.VMTransport.oneway(Object)
org.apache.activemq.transport.MutexTransport.oneway(Object)
org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
org.apache.activemq.broker.TransportConnection.dispatch(Command)
org.apache.activemq.broker.TransportConnection.processDispatch(Command)
org.apache.activemq.broker.TransportConnection.iterate()
org.apache.activemq.thread.DedicatedTaskRunner.runTask()
org.apache.activemq.thread.DedicatedTaskRunner$1.run()

Both brokers run on a local 1 Gbit network. This scenario happens always after 
few hours of work and is recreated consistently.

Broker A relevant configuration:
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd 
http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd";
>

    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="as2analyticsEmbedded" persistent="true" 
dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue=">" producerFlowControl="false" 
memoryLimit="1gb">
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
                <managementContext createConnector="false"/>
        </managementContext>

        <networkConnectors>
            <networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
                name="bridge"
                duplex="true"
                conduitSubscriptions="true"
                decreaseNetworkConsumerPriority="false">
            </networkConnector>
        </networkConnectors>

        <!-- both FS and DB must be provided. AMQ will use DB for long-term 
storage.
        FS contents that has not been consumed yet is stored to DB from time to 
time -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter 
dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded" 
dataSource="#mssql-ds" useDatabaseLock="false">
                <adapter>
                        <imageBasedJDBCAdaptor/>
                </adapter>
            </jdbcPersistenceAdapter>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="2gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="80gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="10gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="as2analyticsEmbeddedConnector" 
uri="vm://as2analyticsEmbedded"/>
        </transportConnectors>

    </broker>

    <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
destroy-method="close">
        <property name="driverClassName" 
value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
        <property name="url" 
value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
        <property name="username" value="obfuscated"/>
        <property name="password" value="obfuscated"/>
        <property name="initialSize" value="1"/>
        <property name="maxActive" value="100"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>

</beans>

Broker B relevant configuration:

<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>

    <!-- Allows us to use system properties as variables in this configuration 
file -->
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="analyticsCentral" dataDirectory="${activemq.base}/data" 
destroyApplicationContextOnStop="true">

        <!--
                        For better performances use VM cursor and small memory 
limit.
                        For more information, see:

            http://activemq.apache.org/message-cursors.html

            Also, if your producer is "hanging", it's probably due to producer 
flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue=">" producerFlowControl="false" 
memoryLimit="1gb">
                  <pendingQueuePolicy>
                    <fileQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
                <policyEntry queue=">">
                   <deadLetterStrategy>
                     <individualDeadLetterStrategy
                        queuePrefix="DLQ." useQueueForQueueMessages="true" />
                   </deadLetterStrategy>
               </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed 
in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default 
persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
                <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" 
dataSource="#mssql-ds" useDatabaseLock="false">
                <adapter>
                        <imageBasedJDBCAdaptor/>
                </adapter>
                </jdbcPersistenceAdapter>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="2gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="10gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="5gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>

    </broker>

        <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
              <amq:redeliveryPolicy>
                <amq:redeliveryPolicy maximumRedeliveries="0"/>
              </amq:redeliveryPolicy>
        </amq:connectionFactory>

        <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
destroy-method="close">
                <property name="driverClassName" 
value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
                <property name="url" 
value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
                <property name="username" value="obfuscated"/>
                <property name="password" value="obfuscated"/>
                <property name="initialSize" value="1"/>
                <property name="maxActive" value="100"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        It also includes Camel (with its web console), see 
${ACTIVEMQ_HOME}/conf/camel.xml for more info

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to