[ 
https://issues.apache.org/jira/browse/AMQ-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991745#comment-14991745
 ] 

Devesh Arora edited comment on AMQ-5934 at 11/5/15 3:12 PM:
------------------------------------------------------------

[~gtully] thank you very much for your feedback - you're absolutely right.  I 
wasn't sure if it was intentional or not to have multiple threads access that 
map. Now that I know, I'm going to dig a little further to figure out which 
threads are trying to access it. 


was (Author: [email protected]):
[~gtully] thank you very much for your feedback - you're absolutely right. I'm 
going to dig a little further to figure out which threads are trying to access 
this map. 

> ActiveMQ durable topic subscribers not able to receive new messages after 
> running for awhile
> --------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5934
>                 URL: https://issues.apache.org/jira/browse/AMQ-5934
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.10.2, 5.11.1
>         Environment: ActiveMQ 5.10.2 broker and a topic subscriber with 
> ActiveMQ 5.10 client libraries.
>            Reporter: Eric X
>            Priority: Critical
>         Attachments: AMQ-5934.patch, activemq.dump
>
>
> We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test 
> durable topic subscriber that connects to the broker using tcp protocol.  The 
> client connection string as follows:
> failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false
> At first, we started five test clients (durable topic subscribers) with 
> aforementioned connection string and make sure every subscriber is connected 
> to the broker.  After that, we use a test client to publish 1000000 messages 
> to the topic that five clients subscribed.  After running for a few hours, 
> two of these five clients were hung (stopped receiving messages from the 
> broker).  In these two clients, there are 636786 and 93915 messages in 
> pending queue, 1000 messages in dispatched queue.  The other three received 
> all the messages from topic.  We have been able to consistently reproduce the 
> issue.   These stale clients appears connected to the broker but do not 
> receive any new messages after they were hung.
> Can you someone look into this issue?  It is very important for us.  We have 
> tried various configurations including jms.prefetchPolicy.topicPrefetch=1 but 
> none of them are working.
> Here is the activemq conf file (activemq.xml) for this issue.
>  <!--
>     Licensed to the Apache Software Foundation (ASF) under one or more
>     contributor license agreements.  See the NOTICE file distributed with
>     this work for additional information regarding copyright ownership.
>     The ASF licenses this file to You under the Apache License, Version 2.0
>     (the "License"); you may not use this file except in compliance with
>     the License.  You may obtain a copy of the License at
>    
>     http://www.apache.org/licenses/LICENSE-2.0
>    
>     Unless required by applicable law or agreed to in writing, software
>     distributed under the License is distributed on an "AS IS" BASIS,
>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>     See the License for the specific language governing permissions and
>     limitations under the License.
> -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.apache.org/schema/core";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core 
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>     <!-- Allows us to use system properties as variables in this 
> configuration file -->
>     <bean 
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.base}/conf/credentials.properties</value> 
>         </property>      
>     </bean> 
>     <!-- 
>         The <broker> element is used to configure the ActiveMQ broker. 
>     -->
>     <broker xmlns="http://activemq.apache.org/schema/core"; 
> advisorySupport="false" useJmx="true" brokerName="dev.masterbroker-1" 
> dataDirectory="${activemq.data}" >
>         <!-- 
>             The managementContext is used to configure how ActiveMQ is 
> exposed in 
>             JMX. By default, ActiveMQ uses the MBean server that is started 
> by 
>             the JVM. For more information, see: 
>             
>             http://activemq.apache.org/jmx.html 
>         -->
>         <managementContext>
>             <managementContext createConnector="true" connectorPort="2099"/>
>         </managementContext>
>       <plugins>
>         <jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
>         <jaasDualAuthenticationPlugin configuration="activemq-domain"/>
>  
>         <!-- <simpleAuthenticationPlugin>
>       <users>
>               <authenticationUser username="system" password="manager"
>                       groups="users,admins"/>
>               <authenticationUser username="user" password="password"
>                       groups="users"/>
>               <authenticationUser username="guest" password="password" 
> groups="guests"/>
>       </users>
>       </simpleAuthenticationPlugin>   -->
>  
>       <authorizationPlugin>
>         <map>
>           <authorizationMap>
>             <authorizationEntries>
>               <authorizationEntry queue=">" read="admins,users,guests" 
> write="admins,users,guests" admin="admins" />
>               <authorizationEntry queue="USERS.>" read="users" write="users" 
> admin="users" />
>               <authorizationEntry queue="GUEST.>" read="guests" 
> write="guests,users" admin="guests,users" />
>               
>               <authorizationEntry topic=">" read="admins,users,guests" 
> write="admins,users,guests" admin="admins" />
>               <authorizationEntry topic="USERS.>" read="users" write="users" 
> admin="users" />
>               <authorizationEntry topic="GUEST.>" read="guests" 
> write="guests,users" admin="guests,users" />
>               
>               <authorizationEntry topic="ActiveMQ.Advisory.>" 
> read="guests,users" write="guests,users" admin="guests,users"/>
>             </authorizationEntries>
>             
>             <tempDestinationAuthorizationEntry>  
>              <tempDestinationAuthorizationEntry read="tempDestinationAdmins" 
> write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
>            </tempDestinationAuthorizationEntry>               
>           </authorizationMap>
>         </map>
>       </authorizationPlugin>
>       <redeliveryPlugin fallbackToDeadLetter="false" 
> sendToDlqIfMaxRetriesExceeded="false">
>               <redeliveryPolicyMap>
>                   <redeliveryPolicyMap>
>                       <redeliveryPolicyEntries>
>                           <!-- a destination specific policy -->
>                           <redeliveryPolicy topic="TP5" 
> maximumRedeliveries="-1" redeliveryDelay="10000" />
>                       </redeliveryPolicyEntries>
>                       <!-- the fallback policy for all other destinations -->
>                       <defaultEntry>
>                           <redeliveryPolicy maximumRedeliveries="4" 
> initialRedeliveryDelay="5000" redeliveryDelay="10000" />
>                       </defaultEntry>
>                   </redeliveryPolicyMap>
>               </redeliveryPolicyMap>
>        </redeliveryPlugin>
>       </plugins> 
>         <!-- 
>             Configure message persistence for the broker. The default 
> persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag). 
>             For more information, see: 
>             
>             http://activemq.apache.org/persistence.html 
>         -->
>         <!--
>         <persistenceAdapter>
>                           <kahaDB directory="${activemq.data}/kahadb"
>                                   ignoreMissingJournalfiles="true"
>                                   checkForCorruptJournalFiles="true"
>                                   checksumJournalFiles="true"
>                                   journalMaxFileLength="256mb"/>
>         </persistenceAdapter>
>         -->
>         <persistenceAdapter>
>                 <mKahaDB directory="${activemq.data}/kahadb">
>                         <filteredPersistenceAdapters>
>                                 <!-- match all queues -->
>                                 <filteredKahaDB queue=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB 
> journalMaxFileLength="64mb"
>                                                         
> ignoreMissingJournalfiles="true"
>                                                         
> checkForCorruptJournalFiles="true"
>                                                         
> checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                                 <!-- match all destinations -->
>                                 <filteredKahaDB topic=">">
>                                         <persistenceAdapter>
>                                                 <kahaDB 
> journalMaxFileLength="64mb"
>                                                         
> ignoreMissingJournalfiles="true"
>                                                         
> checkForCorruptJournalFiles="true"
>                                                         
> checksumJournalFiles="true" />
>                                         </persistenceAdapter>
>                                 </filteredKahaDB>
>                         </filteredPersistenceAdapters>
>                 </mKahaDB>
>         </persistenceAdapter>
>         <!--
>             It's advisable to turn on producer flow control in the production 
> system
>             The systemUsage controls the maximum amount of space the broker 
> will 
>             use before slowing down producers. For more information, see:
>             
>             http://activemq.apache.org/producer-flow-control.html -->
>               
>            <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" optimizedDispatch="true" 
> producerFlowControl="false" memoryLimit="200mb" expireMessagesPeriod="300000">
>                     <!-- The constantPendingMessageLimitStrategy is used to 
> prevent
>                          slow topic consumers to block producers and affect 
> other consumers
>                          by limiting the number of messages that are retained
>                          For more information, see:
>                          
> http://activemq.apache.org/slow-consumer-handling.html
>                     -->
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" optimizedDispatch="true" 
> producerFlowControl="false" memoryLimit="100mb">
>                   <!-- Use VM cursor for better latency
>                        For more information, see:
>                        http://activemq.apache.org/message-cursors.html
>                   <pendingQueuePolicy>
>                     <vmQueueCursor/>
>                   </pendingQueuePolicy>
>                   -->
>                 </policyEntry>
>            <policyEntry queue="Que1" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que2" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que3" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>          <policyEntry queue="Que4" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que5" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que6" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que7" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que8" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que9" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry queue="Que10" producerFlowControl="false" 
> maxPageSize="5000"  expireMessagesPeriod="300000" >
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           
>           <policyEntry topic="TP1" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP2" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP3" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP5" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP6" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP7" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP8" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP9" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>           <policyEntry topic="TP10" producerFlowControl="false" 
> maxPageSize="5000" >
>             <dispatchPolicy>
>               <roundRobinDispatchPolicy />
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy />
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>     <destinations>
>       <queue physicalName="Que1" />
>       <queue physicalName="Que2" />
>       <queue physicalName="Que3" />
>       <queue physicalName="Que4" />
>       <queue physicalName="Que5" />
>       <queue physicalName="Que6" />
>       <queue physicalName="Que7" />
>       <queue physicalName="Que8" />
>       <queue physicalName="Que9" />
>       <queue physicalName="Que10" />
>       <topic physicalName="TP1" />
>       <topic physicalName="TP2" />
>       <topic physicalName="TP3" />
>       <topic physicalName="TP4" />
>       <topic physicalName="TP5" />
>       <topic physicalName="TP6" />
>       <topic physicalName="TP7" />
>       <topic physicalName="TP8" />
>       <topic physicalName="TP9" />
>       <topic physicalName="TP10" />
>     </destinations>
>  
>         <!--
>             The sslContext can be used to configure broker-specific SSL 
> properties.
>         -->
>         <sslContext>
>             <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
>               keyStorePassword="password" 
> trustStore="file:${activemq.base}/conf/broker.ts"
>               trustStorePassword="password"/>
>         </sslContext>
>       
>         <plugins>
>             <loggingBrokerPlugin logAll="false" logConnectionEvents="false" 
> logSessionEvents="false"/>
>             <timeStampingBrokerPlugin zeroExpirationOverride="0" 
> ttlCeiling="0" futureOnly="true"/>
>             <traceBrokerPathPlugin/>
>         </plugins>
>   
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="6 gb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="20 gb" name="foo"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="2 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>         <!-- The store and forward broker networks ActiveMQ will listen to -->
>         <networkConnectors>
>             <!-- <networkConnector name="default-nc" 
> uri="multicast://default"/> -->
>         </networkConnectors>
>         <!-- 
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see: 
>             
>             http://activemq.apache.org/configuring-transports.html 
>         -->
>         <transportConnectors>
>             <!-- DOS protection, limit concurrent connections to 1000 and 
> frame size to 100MB -->
>             <transportConnector name="openwire" 
> uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             
>                <transportConnector name="nio+ssl" 
> uri="nio+ssl://localhost:61639?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>             <transportConnector name="ssl" 
> uri="ssl://localhost:61637?needClientAuth=true&amp;wireFormat.maxInactivityDurationInitalDelay=30000"/>
>         </transportConnectors>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans"; 
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>     </broker>
> <!--
>     <commandAgent xmlns="http://activemq.apache.org/schema/core"; 
> brokerUrl="vm://asb-jms-01-vm.eng.rr.com" username="system" 
> password="password"/>
> -->
>     <!-- 
>         Uncomment to enable Camel
>         Take a look at activemq-camel.xml for more details
>     <import resource="activemq-camel.xml"/> -->
>     <!-- 
>         Enable web consoles, REST and Ajax APIs and demos
>         Take a look at activemq-jetty.xml for more details -->
>     <import resource="jetty.xml"/> 
>     
> </beans>
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to