[ 
https://issues.apache.org/jira/browse/AMQ-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13150538#comment-13150538
 ] 

Gary Tully commented on AMQ-3591:
---------------------------------

what does netstat say about both ends of the tcp connection? Is the connection 
half closed or aborted on one end and we are waiting on tcp write timeouts.
Also, the use of static: and failover: together is not recommended as the 
failover transport hides transport errors from the network connector, sometimes 
leaving it in an inconsistent state. 

                
> bridge transport is stuck on socket write at both ends
> ------------------------------------------------------
>
>                 Key: AMQ-3591
>                 URL: https://issues.apache.org/jira/browse/AMQ-3591
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.5.0
>         Environment: 2 redhat linux
>            Reporter: Leon Fleysher
>
> We have embedded (into Jboss 7 web app) broker A on one linux connecting 
> through tcp duplex=true bridge to another standalone broker B. Both brokers 
> exchange messages through 4 common queues. Each broker has its own 
> persistence storage in mssql db.
> After a few hours of flawless work the bridge stops delivering the messages 
> in both directions.
> broker A offending thread stack trace (causing the rest to wait on oneway 
> mutex):
> BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 
> 0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, 
> int)
> java.io.DataOutputStream.write(byte[], int, int)
> org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence,
>  DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat,
>  Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat,
>  Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat,
>  Object, DataOutput, BooleanStream)
> org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, 
> ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, 
> TransportListener, Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.PooledTaskRunner.runTask()
> org.apache.activemq.thread.PooledTaskRunner$1.run()
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
> java.util.concurrent.ThreadPoolExecutor$Worker.run()
> java.lang.Thread.run()
> broker B offending thread stack trace (causing the rest to wait on oneway 
> mutex):
> ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] 
> CPU time: 0:00
> java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
> java.net.SocketOutputStream.socketWrite(byte[], int, int)
> java.net.SocketOutputStream.write(byte[], int, int)
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
> java.io.DataOutputStream.flush()
> org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
> org.apache.activemq.transport.InactivityMonitor.oneway(Object)
> org.apache.activemq.transport.TransportFilter.oneway(Object)
> org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, 
> ResponseCallback)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
> org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
> org.apache.activemq.transport.TransportFilter.onCommand(Object)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, 
> TransportListener, Object)
> org.apache.activemq.transport.vm.VMTransport.oneway(Object)
> org.apache.activemq.transport.MutexTransport.oneway(Object)
> org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
> org.apache.activemq.broker.TransportConnection.dispatch(Command)
> org.apache.activemq.broker.TransportConnection.processDispatch(Command)
> org.apache.activemq.broker.TransportConnection.iterate()
> org.apache.activemq.thread.DedicatedTaskRunner.runTask()
> org.apache.activemq.thread.DedicatedTaskRunner$1.run()
> Both brokers run on a local 1 Gbit network. This scenario happens always 
> after few hours of work and is recreated consistently.
> Broker A relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.apache.org/schema/core";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd 
> http://activemq.apache.org/schema/core 
> http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd";
> >
>     <broker xmlns="http://activemq.apache.org/schema/core"; 
> brokerName="as2analyticsEmbedded" persistent="true" 
> dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" 
> memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <managementContext>
>                 <managementContext createConnector="false"/>
>         </managementContext>
>         <networkConnectors>
>             <networkConnector 
> uri="static:(failover:(tcp://10.0.213.38:61616))"
>                 name="bridge"
>                 duplex="true"
>                 conduitSubscriptions="true"
>                 decreaseNetworkConsumerPriority="false">
>             </networkConnector>
>         </networkConnectors>
>         <!-- both FS and DB must be provided. AMQ will use DB for long-term 
> storage.
>         FS contents that has not been consumed yet is stored to DB from time 
> to time -->
>         <persistenceAdapter>
>             <jdbcPersistenceAdapter 
> dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded" 
> dataSource="#mssql-ds" useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>             </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="80gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="10gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <transportConnectors>
>             <transportConnector name="as2analyticsEmbeddedConnector" 
> uri="vm://as2analyticsEmbedded"/>
>         </transportConnectors>
>     </broker>
>     <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
> destroy-method="close">
>         <property name="driverClassName" 
> value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>         <property name="url" 
> value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
>         <property name="username" value="obfuscated"/>
>         <property name="password" value="obfuscated"/>
>         <property name="initialSize" value="1"/>
>         <property name="maxActive" value="100"/>
>         <property name="poolPreparedStatements" value="true"/>
>     </bean>
> </beans>
> Broker B relevant configuration:
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.apache.org/schema/core";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core 
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>     <!-- Allows us to use system properties as variables in this 
> configuration file -->
>     <bean 
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value>
>         </property>
>     </bean>
>     <!--
>         The <broker> element is used to configure the ActiveMQ broker.
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core"; 
> brokerName="analyticsCentral" dataDirectory="${activemq.base}/data" 
> destroyApplicationContextOnStop="true">
>         <!--
>                         For better performances use VM cursor and small 
> memory limit.
>                         For more information, see:
>             http://activemq.apache.org/message-cursors.html
>             Also, if your producer is "hanging", it's probably due to 
> producer flow control.
>             For more information, see:
>             http://activemq.apache.org/producer-flow-control.html
>         -->
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry queue=">" producerFlowControl="false" 
> memoryLimit="1gb">
>                   <pendingQueuePolicy>
>                     <fileQueueCursor/>
>                   </pendingQueuePolicy>
>                 </policyEntry>
>                 <policyEntry queue=">">
>                    <deadLetterStrategy>
>                      <individualDeadLetterStrategy
>                         queuePrefix="DLQ." useQueueForQueueMessages="true" />
>                    </deadLetterStrategy>
>                </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <!--
>             The managementContext is used to configure how ActiveMQ is 
> exposed in
>             JMX. By default, ActiveMQ uses the MBean server that is started by
>             the JVM. For more information, see:
>             http://activemq.apache.org/jmx.html
>         -->
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>         <!--
>             Configure message persistence for the broker. The default 
> persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>             For more information, see:
>             http://activemq.apache.org/persistence.html
>         -->
>         <persistenceAdapter>
>                 <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" 
> dataSource="#mssql-ds" useDatabaseLock="false">
>                 <adapter>
>                         <imageBasedJDBCAdaptor/>
>                 </adapter>
>                 </jdbcPersistenceAdapter>
>         </persistenceAdapter>
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="2gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="10gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="5gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!--
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see:
>             http://activemq.apache.org/configuring-transports.html
>         -->
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>
>         <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
>               <amq:redeliveryPolicy>
>                 <amq:redeliveryPolicy maximumRedeliveries="0"/>
>               </amq:redeliveryPolicy>
>         </amq:connectionFactory>
>         <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
> destroy-method="close">
>                 <property name="driverClassName" 
> value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
>                 <property name="url" 
> value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
>                 <property name="username" value="obfuscated"/>
>                 <property name="password" value="obfuscated"/>
>                 <property name="initialSize" value="1"/>
>                 <property name="maxActive" value="100"/>
>                 <property name="poolPreparedStatements" value="true"/>
>         </bean>
>     <!--
>         Enable web consoles, REST and Ajax APIs and demos
>         It also includes Camel (with its web console), see 
> ${ACTIVEMQ_HOME}/conf/camel.xml for more info
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>     -->
>     <import resource="jetty.xml"/>
> </beans>

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to