[
https://issues.apache.org/jira/browse/AMQ-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marek Slama updated AMQ-3728:
-----------------------------
Description:
I have following configuration:
1.One producer
2.One broker on the same machine as producer.
3.One offline durable subscriber but I have enough storage size so it should
not overflow.
I run our import test and it sends about 10 messages/s. After some hours of run
I start to get:
many
{code}javax.jms.JMSException:
org.apache.activemq.transport.RequestTimedOutIOException{code}
(I set send timeout 2s to avoid infinite block in case that broker storage is
full. So first question is why it happens when there should be enough space. I
will provide my broker configuration below.)
Then I get:
{code}
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
(ActiveMQ Connection Executor:
tcp://usfr-cmsnpdev.insideidc.com/10.1.4.42:61616) Failed to send message.:
javax.jms.JMSException: Broken pipe
{code}
some more
{code}
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
{code}
and then all the time:
{code}
javax.jms.IllegalStateException: The Session is closed
{code}
I have simple producer code. It is Spring bean. I create connection and session
when bean is created. For every send message I create producer and close it:
{code}
//Init code
connection = connectionFactory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException ex) {
logger.error("Failed to send message.", ex);
}
});
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
....
//Send code
private void sendMessage(String xmlMessage, Topic topic, String
containerId) {
MessageProducer producer = null;
try {
// Create the producer.
producer = session.createProducer(topic);
TextMessage txtMessage = session.createTextMessage(xmlMessage);
producer.send(txtMessage);
} catch (JMSException ex) {
logger.error("", ex);
} finally {
try {
if (producer != null) {
producer.close();
}
} catch (JMSException ex) {
}
}
}
{code}
Use default connection factory:
{code}
<bean id="idc_metacop_jmsFactory.prototype"
class="org.apache.activemq.ActiveMQConnectionFactory"
abstract="true">
<property name="brokerURL" value="${metacop.jms.brokerURL}"/>
<property name="sendTimeout" value="2000"/>
</bean>
{code}
Log from broker:
{code}
2012-02-17 21:55:40,389 | INFO | Transport failed:
org.apache.activemq.transport.InactivityIOException: Channel was inactive for
too (>30000) long: /10.1.4.42:56166 |
org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
2012-02-17 21:55:42,445 | INFO | Transport failed:
org.apache.activemq.transport.InactivityIOException: Channel was inactive for
too (>30000) long: /10.1.4.42:39395 |
org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
{code}
Broker configuration:
{code}
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="200mb">
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="200mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="500 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="2 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
{code}
I can provide full call stacks if necessary. One thing which comes to my mind:
When I have just one connection is call of producer.send thread safe? I do not
have any synchronization there.
BTW is there any way how to purge even undelivered messages from broker
storage? Or how to monitor number of messages in storage? I tried to delete
offline durable subscribers from Web Admin but data folder size does not change
but I have no way how to see if messages was really removed from storage (and
db is not compacted only or if messages stay in storage.
was:
I have following configuration:
1.One producer
2.One broker on the same machine as producer.
3.One offline durable subscriber but I have enough storage size so it should
not overflow.
I run our import test and it sends about 10 messages/s. After some hours of run
I start to get:
many
{code}javax.jms.JMSException:
org.apache.activemq.transport.RequestTimedOutIOException{code}
(I set send timeout 2s to avoid infinite block in case that broker storage is
full. So first question is why it happens when there should be enough space. I
will provide my broker configuration below.)
Then I get:
{code}
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
(ActiveMQ Connection Executor:
tcp://usfr-cmsnpdev.insideidc.com/10.1.4.42:61616) Failed to send message.:
javax.jms.JMSException: Broken pipe
{code}
some more
{code}
javax.jms.JMSException: Cannot send, channel has already failed:
usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
{code}
and then all the time:
{code}
javax.jms.IllegalStateException: The Session is closed
{code}
I have simple producer code. It is Spring bean. I create connection and session
when bean is created. For every send message I create producer and close it:
{code}
//Init code
connection = connectionFactory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException ex) {
logger.error("Failed to send message.", ex);
}
});
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
....
//Send code
private void sendMessage(String xmlMessage, Topic topic, String
containerId) {
MessageProducer producer = null;
try {
// Create the producer.
producer = session.createProducer(topic);
TextMessage txtMessage = session.createTextMessage(xmlMessage);
producer.send(txtMessage);
} catch (JMSException ex) {
logger.error("", ex);
} finally {
try {
if (producer != null) {
producer.close();
}
} catch (JMSException ex) {
}
}
}
{code}
Use default connection factory:
{code}
<bean id="idc_metacop_jmsFactory.prototype"
class="org.apache.activemq.ActiveMQConnectionFactory"
abstract="true">
<property name="brokerURL" value="${metacop.jms.brokerURL}"/>
<property name="sendTimeout" value="2000"/>
</bean>
{code}
Log from broker:
{code}
2012-02-17 21:55:40,389 | INFO | Transport failed:
org.apache.activemq.transport.InactivityIOException: Channel was inactive for
too (>30000) long: /10.1.4.42:56166 |
org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
2012-02-17 21:55:42,445 | INFO | Transport failed:
org.apache.activemq.transport.InactivityIOException: Channel was inactive for
too (>30000) long: /10.1.4.42:39395 |
org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
{code}
Broker configuration:
{code}
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="200mb">
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="200mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="500 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="2 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
{code}
BTW is there any way how to purge even undelivered messages from broker
storage? Or how to monitor number of messages in storage? I tried to delete
offline durable subscribers from Web Admin but data folder size does not change
but I have no way how to see if messages was really removed from storage (and
db is not compacted only or if messages stay in storage.
> Broker stops to accept messages
> -------------------------------
>
> Key: AMQ-3728
> URL: https://issues.apache.org/jira/browse/AMQ-3728
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.5.0
> Environment: Linux, 64bit, JDK 6u30 64bit
> Reporter: Marek Slama
>
> I have following configuration:
> 1.One producer
> 2.One broker on the same machine as producer.
> 3.One offline durable subscriber but I have enough storage size so it should
> not overflow.
> I run our import test and it sends about 10 messages/s. After some hours of
> run I start to get:
> many
> {code}javax.jms.JMSException:
> org.apache.activemq.transport.RequestTimedOutIOException{code}
> (I set send timeout 2s to avoid infinite block in case that broker storage
> is full. So first question is why it happens when there should be enough
> space. I will provide my broker configuration below.)
> Then I get:
> {code}
> javax.jms.JMSException: Cannot send, channel has already failed:
> usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
> javax.jms.JMSException: Cannot send, channel has already failed:
> usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
> (ActiveMQ Connection Executor:
> tcp://usfr-cmsnpdev.insideidc.com/10.1.4.42:61616) Failed to send message.:
> javax.jms.JMSException: Broken pipe
> {code}
> some more
> {code}
> javax.jms.JMSException: Cannot send, channel has already failed:
> usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
> {code}
> and then all the time:
> {code}
> javax.jms.IllegalStateException: The Session is closed
> {code}
> I have simple producer code. It is Spring bean. I create connection and
> session when bean is created. For every send message I create producer and
> close it:
> {code}
> //Init code
> connection = connectionFactory.createConnection();
> connection.setExceptionListener(new ExceptionListener() {
> public void onException(JMSException ex) {
> logger.error("Failed to send message.", ex);
> }
> });
> session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> ....
> //Send code
> private void sendMessage(String xmlMessage, Topic topic, String
> containerId) {
> MessageProducer producer = null;
> try {
> // Create the producer.
> producer = session.createProducer(topic);
> TextMessage txtMessage = session.createTextMessage(xmlMessage);
> producer.send(txtMessage);
> } catch (JMSException ex) {
> logger.error("", ex);
> } finally {
> try {
> if (producer != null) {
> producer.close();
> }
> } catch (JMSException ex) {
> }
> }
> }
> {code}
> Use default connection factory:
> {code}
> <bean id="idc_metacop_jmsFactory.prototype"
> class="org.apache.activemq.ActiveMQConnectionFactory"
> abstract="true">
> <property name="brokerURL" value="${metacop.jms.brokerURL}"/>
> <property name="sendTimeout" value="2000"/>
> </bean>
> {code}
> Log from broker:
> {code}
> 2012-02-17 21:55:40,389 | INFO | Transport failed:
> org.apache.activemq.transport.InactivityIOException: Channel was inactive for
> too (>30000) long: /10.1.4.42:56166 |
> org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
> Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
> 2012-02-17 21:55:42,445 | INFO | Transport failed:
> org.apache.activemq.transport.InactivityIOException: Channel was inactive for
> too (>30000) long: /10.1.4.42:39395 |
> org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor
> Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
> {code}
> Broker configuration:
> {code}
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.base}/data"
> destroyApplicationContextOnStop="true">
> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry topic=">" producerFlowControl="true"
> memoryLimit="200mb">
> </policyEntry>
> <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="200mb">
> </policyEntry>
> </policyEntries>
> </policyMap>
> </destinationPolicy>
> <managementContext>
> <managementContext createConnector="false"/>
> </managementContext>
> <persistenceAdapter>
> <kahaDB directory="${activemq.base}/data/kahadb"/>
> </persistenceAdapter>
> <systemUsage>
> <systemUsage>
> <memoryUsage>
> <memoryUsage limit="500 mb"/>
> </memoryUsage>
> <storeUsage>
> <storeUsage limit="2 gb"/>
> </storeUsage>
> <tempUsage>
> <tempUsage limit="2 gb"/>
> </tempUsage>
> </systemUsage>
> </systemUsage>
> <transportConnectors>
> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
> </transportConnectors>
> </broker>
> {code}
> I can provide full call stacks if necessary. One thing which comes to my
> mind: When I have just one connection is call of producer.send thread safe? I
> do not have any synchronization there.
> BTW is there any way how to purge even undelivered messages from broker
> storage? Or how to monitor number of messages in storage? I tried to delete
> offline durable subscribers from Web Admin but data folder size does not
> change but I have no way how to see if messages was really removed from
> storage (and db is not compacted only or if messages stay in storage.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira