[
https://issues.apache.org/jira/browse/AMQ-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991745#comment-14991745
]
Devesh Arora edited comment on AMQ-5934 at 11/5/15 3:12 PM:
------------------------------------------------------------
[~gtully] thank you very much for your feedback - you're absolutely right. I
wasn't sure if it was intentional or not to have multiple threads access that
map. Now that I know, I'm going to dig a little further to figure out which
threads are trying to access it.
was (Author: [email protected]):
[~gtully] thank you very much for your feedback - you're absolutely right. I'm
going to dig a little further to figure out which threads are trying to access
this map.
> ActiveMQ durable topic subscribers not able to receive new messages after
> running for awhile
> --------------------------------------------------------------------------------------------
>
> Key: AMQ-5934
> URL: https://issues.apache.org/jira/browse/AMQ-5934
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.10.2, 5.11.1
> Environment: ActiveMQ 5.10.2 broker and a topic subscriber with
> ActiveMQ 5.10 client libraries.
> Reporter: Eric X
> Priority: Critical
> Attachments: AMQ-5934.patch, activemq.dump
>
>
> We have set up an environment which uses ActiveMQ 5.10.2 as broker and a test
> durable topic subscriber that connects to the broker using tcp protocol. The
> client connection string as follows:
> failover:(tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false,tcp://host1:61617?keepAlive=true&wireFormat.tightEncodingEnabled=false)?randomize=false&initialReconnectDelay=500&timeout=2000&useExponentialBackOff=false&jms.watchTopicAdvisories=false
> At first, we started five test clients (durable topic subscribers) with
> aforementioned connection string and make sure every subscriber is connected
> to the broker. After that, we use a test client to publish 1000000 messages
> to the topic that five clients subscribed. After running for a few hours,
> two of these five clients were hung (stopped receiving messages from the
> broker). In these two clients, there are 636786 and 93915 messages in
> pending queue, 1000 messages in dispatched queue. The other three received
> all the messages from topic. We have been able to consistently reproduce the
> issue. These stale clients appears connected to the broker but do not
> receive any new messages after they were hung.
> Can you someone look into this issue? It is very important for us. We have
> tried various configurations including jms.prefetchPolicy.topicPrefetch=1 but
> none of them are working.
> Here is the activemq conf file (activemq.xml) for this issue.
> <!--
> Licensed to the Apache Software Foundation (ASF) under one or more
> contributor license agreements. See the NOTICE file distributed with
> this work for additional information regarding copyright ownership.
> The ASF licenses this file to You under the Apache License, Version 2.0
> (the "License"); you may not use this file except in compliance with
> the License. You may obtain a copy of the License at
>
> http://www.apache.org/licenses/LICENSE-2.0
>
> Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> See the License for the specific language governing permissions and
> limitations under the License.
> -->
> <beans
> xmlns="http://www.springframework.org/schema/beans"
> xmlns:amq="http://activemq.apache.org/schema/core"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
> <!-- Allows us to use system properties as variables in this
> configuration file -->
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
> <property name="locations">
> <value>file:${activemq.base}/conf/credentials.properties</value>
> </property>
> </bean>
> <!--
> The <broker> element is used to configure the ActiveMQ broker.
> -->
> <broker xmlns="http://activemq.apache.org/schema/core"
> advisorySupport="false" useJmx="true" brokerName="dev.masterbroker-1"
> dataDirectory="${activemq.data}" >
> <!--
> The managementContext is used to configure how ActiveMQ is
> exposed in
> JMX. By default, ActiveMQ uses the MBean server that is started
> by
> the JVM. For more information, see:
>
> http://activemq.apache.org/jmx.html
> -->
> <managementContext>
> <managementContext createConnector="true" connectorPort="2099"/>
> </managementContext>
> <plugins>
> <jaasDualAuthenticationPlugin configuration="activemq-ssl-domain"/>
> <jaasDualAuthenticationPlugin configuration="activemq-domain"/>
>
> <!-- <simpleAuthenticationPlugin>
> <users>
> <authenticationUser username="system" password="manager"
> groups="users,admins"/>
> <authenticationUser username="user" password="password"
> groups="users"/>
> <authenticationUser username="guest" password="password"
> groups="guests"/>
> </users>
> </simpleAuthenticationPlugin> -->
>
> <authorizationPlugin>
> <map>
> <authorizationMap>
> <authorizationEntries>
> <authorizationEntry queue=">" read="admins,users,guests"
> write="admins,users,guests" admin="admins" />
> <authorizationEntry queue="USERS.>" read="users" write="users"
> admin="users" />
> <authorizationEntry queue="GUEST.>" read="guests"
> write="guests,users" admin="guests,users" />
>
> <authorizationEntry topic=">" read="admins,users,guests"
> write="admins,users,guests" admin="admins" />
> <authorizationEntry topic="USERS.>" read="users" write="users"
> admin="users" />
> <authorizationEntry topic="GUEST.>" read="guests"
> write="guests,users" admin="guests,users" />
>
> <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="guests,users" write="guests,users" admin="guests,users"/>
> </authorizationEntries>
>
> <tempDestinationAuthorizationEntry>
> <tempDestinationAuthorizationEntry read="tempDestinationAdmins"
> write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
> </tempDestinationAuthorizationEntry>
> </authorizationMap>
> </map>
> </authorizationPlugin>
> <redeliveryPlugin fallbackToDeadLetter="false"
> sendToDlqIfMaxRetriesExceeded="false">
> <redeliveryPolicyMap>
> <redeliveryPolicyMap>
> <redeliveryPolicyEntries>
> <!-- a destination specific policy -->
> <redeliveryPolicy topic="TP5"
> maximumRedeliveries="-1" redeliveryDelay="10000" />
> </redeliveryPolicyEntries>
> <!-- the fallback policy for all other destinations -->
> <defaultEntry>
> <redeliveryPolicy maximumRedeliveries="4"
> initialRedeliveryDelay="5000" redeliveryDelay="10000" />
> </defaultEntry>
> </redeliveryPolicyMap>
> </redeliveryPolicyMap>
> </redeliveryPlugin>
> </plugins>
> <!--
> Configure message persistence for the broker. The default
> persistence
> mechanism is the KahaDB store (identified by the kahaDB tag).
> For more information, see:
>
> http://activemq.apache.org/persistence.html
> -->
> <!--
> <persistenceAdapter>
> <kahaDB directory="${activemq.data}/kahadb"
> ignoreMissingJournalfiles="true"
> checkForCorruptJournalFiles="true"
> checksumJournalFiles="true"
> journalMaxFileLength="256mb"/>
> </persistenceAdapter>
> -->
> <persistenceAdapter>
> <mKahaDB directory="${activemq.data}/kahadb">
> <filteredPersistenceAdapters>
> <!-- match all queues -->
> <filteredKahaDB queue=">">
> <persistenceAdapter>
> <kahaDB
> journalMaxFileLength="64mb"
>
> ignoreMissingJournalfiles="true"
>
> checkForCorruptJournalFiles="true"
>
> checksumJournalFiles="true" />
> </persistenceAdapter>
> </filteredKahaDB>
> <!-- match all destinations -->
> <filteredKahaDB topic=">">
> <persistenceAdapter>
> <kahaDB
> journalMaxFileLength="64mb"
>
> ignoreMissingJournalfiles="true"
>
> checkForCorruptJournalFiles="true"
>
> checksumJournalFiles="true" />
> </persistenceAdapter>
> </filteredKahaDB>
> </filteredPersistenceAdapters>
> </mKahaDB>
> </persistenceAdapter>
> <!--
> It's advisable to turn on producer flow control in the production
> system
> The systemUsage controls the maximum amount of space the broker
> will
> use before slowing down producers. For more information, see:
>
> http://activemq.apache.org/producer-flow-control.html -->
>
> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry topic=">" optimizedDispatch="true"
> producerFlowControl="false" memoryLimit="200mb" expireMessagesPeriod="300000">
> <!-- The constantPendingMessageLimitStrategy is used to
> prevent
> slow topic consumers to block producers and affect
> other consumers
> by limiting the number of messages that are retained
> For more information, see:
>
> http://activemq.apache.org/slow-consumer-handling.html
> -->
> <pendingMessageLimitStrategy>
> <constantPendingMessageLimitStrategy limit="1000"/>
> </pendingMessageLimitStrategy>
> </policyEntry>
> <policyEntry queue=">" optimizedDispatch="true"
> producerFlowControl="false" memoryLimit="100mb">
> <!-- Use VM cursor for better latency
> For more information, see:
> http://activemq.apache.org/message-cursors.html
> <pendingQueuePolicy>
> <vmQueueCursor/>
> </pendingQueuePolicy>
> -->
> </policyEntry>
> <policyEntry queue="Que1" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que2" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que3" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que4" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que5" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que6" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que7" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que8" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que9" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry queue="Que10" producerFlowControl="false"
> maxPageSize="5000" expireMessagesPeriod="300000" >
> <dispatchPolicy>
> <strictOrderDispatchPolicy/>
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy/>
> </subscriptionRecoveryPolicy>
> </policyEntry>
>
> <policyEntry topic="TP1" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP2" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP3" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP5" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP6" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP7" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP8" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP9" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> <policyEntry topic="TP10" producerFlowControl="false"
> maxPageSize="5000" >
> <dispatchPolicy>
> <roundRobinDispatchPolicy />
> </dispatchPolicy>
> <subscriptionRecoveryPolicy>
> <lastImageSubscriptionRecoveryPolicy />
> </subscriptionRecoveryPolicy>
> </policyEntry>
> </policyEntries>
> </policyMap>
> </destinationPolicy>
> <destinations>
> <queue physicalName="Que1" />
> <queue physicalName="Que2" />
> <queue physicalName="Que3" />
> <queue physicalName="Que4" />
> <queue physicalName="Que5" />
> <queue physicalName="Que6" />
> <queue physicalName="Que7" />
> <queue physicalName="Que8" />
> <queue physicalName="Que9" />
> <queue physicalName="Que10" />
> <topic physicalName="TP1" />
> <topic physicalName="TP2" />
> <topic physicalName="TP3" />
> <topic physicalName="TP4" />
> <topic physicalName="TP5" />
> <topic physicalName="TP6" />
> <topic physicalName="TP7" />
> <topic physicalName="TP8" />
> <topic physicalName="TP9" />
> <topic physicalName="TP10" />
> </destinations>
>
> <!--
> The sslContext can be used to configure broker-specific SSL
> properties.
> -->
> <sslContext>
> <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
> keyStorePassword="password"
> trustStore="file:${activemq.base}/conf/broker.ts"
> trustStorePassword="password"/>
> </sslContext>
>
> <plugins>
> <loggingBrokerPlugin logAll="false" logConnectionEvents="false"
> logSessionEvents="false"/>
> <timeStampingBrokerPlugin zeroExpirationOverride="0"
> ttlCeiling="0" futureOnly="true"/>
> <traceBrokerPathPlugin/>
> </plugins>
>
> <systemUsage>
> <systemUsage>
> <memoryUsage>
> <memoryUsage limit="6 gb"/>
> </memoryUsage>
> <storeUsage>
> <storeUsage limit="20 gb" name="foo"/>
> </storeUsage>
> <tempUsage>
> <tempUsage limit="2 gb"/>
> </tempUsage>
> </systemUsage>
> </systemUsage>
> <!-- The store and forward broker networks ActiveMQ will listen to -->
> <networkConnectors>
> <!-- <networkConnector name="default-nc"
> uri="multicast://default"/> -->
> </networkConnectors>
> <!--
> The transport connectors expose ActiveMQ over a given protocol to
> clients and other brokers. For more information, see:
>
> http://activemq.apache.org/configuring-transports.html
> -->
> <transportConnectors>
> <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
> <transportConnector name="openwire"
> uri="tcp://localhost:61636?wireFormat.maxInactivityDurationInitalDelay=30000"/>
>
> <transportConnector name="nio+ssl"
> uri="nio+ssl://localhost:61639?needClientAuth=true&wireFormat.maxInactivityDurationInitalDelay=30000"/>
> <transportConnector name="ssl"
> uri="ssl://localhost:61637?needClientAuth=true&wireFormat.maxInactivityDurationInitalDelay=30000"/>
> </transportConnectors>
> <!-- destroy the spring context on shutdown to stop jetty -->
> <shutdownHooks>
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
> </shutdownHooks>
> </broker>
> <!--
> <commandAgent xmlns="http://activemq.apache.org/schema/core"
> brokerUrl="vm://asb-jms-01-vm.eng.rr.com" username="system"
> password="password"/>
> -->
> <!--
> Uncomment to enable Camel
> Take a look at activemq-camel.xml for more details
> <import resource="activemq-camel.xml"/> -->
> <!--
> Enable web consoles, REST and Ajax APIs and demos
> Take a look at activemq-jetty.xml for more details -->
> <import resource="jetty.xml"/>
>
> </beans>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)