[
https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413093#comment-15413093
]
Czeslaw commented on AMQ-6067:
------------------------------
Hi,
I see another issue in amq 5.13.2 related with topic message expiration. I
have one producer which send me messages to MyTopic and one durable
subscriber/consumer. Messages are stored in DB and after 30 seconds amq broker
trying to expire these messages ( I have configuration entry for this:
<amq:policyEntry topic="MyTopic.>" expireMessagesPeriod="30000"> ). I’m getting
NullPointerException from AdvisoryBroker and messages are never expired and
remain in activemq_msgs table.
java.lang.NullPointerException
at
org.apache.activemq.advisory.AdvisoryBroker.messageExpired(AdvisoryBroker.java:430)
at
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
at
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
at
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
at
org.apache.activemq.broker.MutableBrokerFilter.messageExpired(MutableBrokerFilter.java:325)
at
org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:776)
at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:660)
Under debugger I see that in
org.apache.activemq.advisory.AdvisoryBroker.messageExpired baseDestination is
null and call baseDestination.getActiveMQDestination() generates NPE
@Override
public void messageExpired(ConnectionContext context, MessageReference
messageReference, Subscription subscription) {
super.messageExpired(context, messageReference, subscription);
try {
if (!messageReference.isAdvisory()) {
BaseDestination baseDestination = (BaseDestination)
messageReference.getMessage().getRegionDestination();
ActiveMQTopic topic =
AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());
Below is code how messages are produced:
public static class Producer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(BROKER_ULR);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("MyTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setTimeToLive(30000);
for(int i =0; i < 10; i ++) {
String text = "Message number " + i + " From: " +
Thread.currentThread().getName();
TextMessage message = session.createTextMessage(text);
message.setJMSDestination(destination);
producer.send(destination, message);
Thread.sleep(10000);
}
// Clean up
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
Under debugger I see that setter method setRegionDestination
(org.apache.activemq.command.Message ) is called in
org.apache.activemq.broker.region.Topic class before message is send:
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message
message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
final ProducerInfo producerInfo =
producerExchange.getProducerState().getInfo();
producerExchange.incrementSend();
final boolean sendProducerAck = !message.isResponseRequired() &&
producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
message.setRegionDestination(this);
I don’t see call this method when message is loaded.
Do you have any idea what can be wrong ? Is it issue in my messages (something
is missing) or it is another bug in amq code ?
> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
> Key: AMQ-6067
> URL: https://issues.apache.org/jira/browse/AMQ-6067
> Project: ActiveMQ
> Issue Type: Bug
> Components: JDBC
> Affects Versions: 5.10.0
> Reporter: Petr Havránek
> Labels: durable, durable_subscription, expiration, jdbc,
> timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired,
> this {{expireMessagesTask}} loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN [ActiveMQ Broker[JmsEngineActivemqBroker]
> Scheduler] [Topic] Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
> at oracle.sql.BLOB.getBytes(BLOB.java:204)
> at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
> at
> oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
> at
> org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
> at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
> at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
> at
> org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> at $Proxy14.doRecover(Unknown Source)
> at
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
> at
> org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
> at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
> at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
> at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
> at
> org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
> at java.util.TimerThread.mainLoop(Timer.java:512)
> at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a
> short look to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages
> and causes the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
> // Get all the Message ids out of the database.
> TransactionContext c = persistenceAdapter.getTransactionContext();
> try {
> c = persistenceAdapter.getTransactionContext();
> adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
> public boolean recoverMessage(long sequenceId, byte[] data) throws
> Exception {
> if (listener.hasSpace()) {
> Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
> msg.getMessageId().setBrokerSequenceId(sequenceId);
> return listener.recoverMessage(msg);
> } else {
> logger.debug("Recovery limit of the messages has exceeded.");
> return false;
> }
> }
> public boolean recoverMessageReference(String reference) throws
> Exception {
> if (listener.hasSpace()) {
> return listener.recoverMessageReference(new MessageId(reference));
> } else {
> logger.debug("Recovery limit of the message references has
> exceeded.");
> return false;
> }
> }
> });
> } catch (SQLException e) {
> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
> throw IOExceptionSupport.create("Failed to recover container. Reason: " +
> e, e);
> } finally {
> c.close();
> }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be
> some messages that should be expired, but need to wait. So better solution
> might be to do this job in more separated transactions.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)