[
https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15407533#comment-15407533
]
Czeslaw commented on AMQ-6067:
------------------------------
Hi,
I see under debugger that for version 5.13.2 in
org.apache.activemq.broker.region.Topic the expireMessagesTask does not control
check max expire page size which is passed as a parameter.
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
}
};
In my case I have default settings for max expire page size eq 400. In doBrowse
method are collected all messages belongs to topic into browseList and never is
checked max ( or call hasSpace internal method).
private void doBrowse(final List<Message> browseList, final int max) {
try {
if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>();
topicStore.recover(new MessageRecoveryListener() {
@Override
public boolean recoverMessage(Message message) throws Exception
{
if (message.isExpired()) {
toExpire.add(message);
}
browseList.add(message);
return true;
}
@Override
public boolean recoverMessageReference(MessageId
messageReference) throws Exception {
return true;
}
@Override
public boolean hasSpace() {
return browseList.size() < max;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
});
final ConnectionContext connectionContext =
createConnectionContext();
...
}
In other words if you have 1 000 000 messages under ACTIVEMQ_MSGS table for
same topic all messages will be collected into memory ( browseList) a this can
case OutOfMemoryError.
> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
> Key: AMQ-6067
> URL: https://issues.apache.org/jira/browse/AMQ-6067
> Project: ActiveMQ
> Issue Type: Bug
> Components: JDBC
> Affects Versions: 5.10.0
> Reporter: Petr Havránek
> Labels: durable, durable_subscription, expiration, jdbc,
> timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired,
> this {{expireMessagesTask}} loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN [ActiveMQ Broker[JmsEngineActivemqBroker]
> Scheduler] [Topic] Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
> at oracle.sql.BLOB.getBytes(BLOB.java:204)
> at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
> at
> oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
> at
> org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
> at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
> at
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
> at
> org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
> at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
> at
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
> at $Proxy14.doRecover(Unknown Source)
> at
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
> at
> org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
> at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
> at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
> at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
> at
> org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
> at java.util.TimerThread.mainLoop(Timer.java:512)
> at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a
> short look to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages
> and causes the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
> // Get all the Message ids out of the database.
> TransactionContext c = persistenceAdapter.getTransactionContext();
> try {
> c = persistenceAdapter.getTransactionContext();
> adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
> public boolean recoverMessage(long sequenceId, byte[] data) throws
> Exception {
> if (listener.hasSpace()) {
> Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
> msg.getMessageId().setBrokerSequenceId(sequenceId);
> return listener.recoverMessage(msg);
> } else {
> logger.debug("Recovery limit of the messages has exceeded.");
> return false;
> }
> }
> public boolean recoverMessageReference(String reference) throws
> Exception {
> if (listener.hasSpace()) {
> return listener.recoverMessageReference(new MessageId(reference));
> } else {
> logger.debug("Recovery limit of the message references has
> exceeded.");
> return false;
> }
> }
> });
> } catch (SQLException e) {
> JDBCPersistenceAdapter.log("JDBC Failure: ", e);
> throw IOExceptionSupport.create("Failed to recover container. Reason: " +
> e, e);
> } finally {
> c.close();
> }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be
> some messages that should be expired, but need to wait. So better solution
> might be to do this job in more separated transactions.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)