[
https://issues.apache.org/jira/browse/AMQ-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reinald Verheij updated AMQ-7394:
---------------------------------
Description:
I believe this affects versions up to 5.15.11 too, since I don't see changes in
the related code.
At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.
When a queue (the tables in the DB for the queue) has many messages of a
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then
it will 'recover' the messages. The first call will get 400 messages and when
consuming and removing those subsequent calls will get 200 messages each
*without* taking into account the configured memory limits per-broker or
per-queue. So memory consumption is rather unconstrained here.
I believe this is because [JDBCMessageStore.java
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
always returns 'true' (at line 368 currently) without asking
{{listener.hasSpace()}} {code:java} @Override
public boolean recoverMessage(long sequenceId, byte[] data)
throws Exception {
Message msg = (Message)wireFormat.unmarshal(new
ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
msg.getMessageId().setEntryLocator(sequenceId);
listener.recoverMessage(msg);
trackLastRecovered(sequenceId, msg.getPriority());
return true;
}{code}
... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages
never jumps into the else branch to abort loading/recovering messages (at line
1093 currently) {code:java} while (rs.next() && count <
maxReturned) {
if (listener.recoverMessage(rs.getLong(1),
getBinaryData(rs, 2))) {
count++;
} else {
LOG.debug("Stopped recover next messages");
break;
}
}{code}
I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the
listener to impose my own limit (however this is a separately configured limit;
it is not coupled with the memory limits of the broker or the queue, so it is
more a workaround than a solution; but it avoids customizing activeMQ code, the
workaround is in our own product code.
was:
I believe this affects versions up to 5.15.11 too, since I don't see changes in
the related code.
At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.
When a queue (the tables in the DB for the queue) has many messages of a
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then
it will 'recover' the messages. The first call will get 400 messages and when
consuming and removing those subsequent calls will get 200 messages each
*without* taking into account the configured memory limits per-broker or
per-queue. So memory consumption can use memory in a rather uncontrollable way
here.
I believe this is because [JDBCMessageStore.java
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
always returns 'true' (at line 368 currently) without asking
{{listener.hasSpace()}} {code:java} @Override
public boolean recoverMessage(long sequenceId, byte[] data)
throws Exception {
Message msg = (Message)wireFormat.unmarshal(new
ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
msg.getMessageId().setEntryLocator(sequenceId);
listener.recoverMessage(msg);
trackLastRecovered(sequenceId, msg.getPriority());
return true;
}{code}
... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages
never jumps into the else branch to abort loading/recovering messages (at line
1093 currently) {code:java} while (rs.next() && count <
maxReturned) {
if (listener.recoverMessage(rs.getLong(1),
getBinaryData(rs, 2))) {
count++;
} else {
LOG.debug("Stopped recover next messages");
break;
}
}{code}
I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the
listener to impose my own limit (however this is a separately configured limit;
it is not coupled with the memory limits of the broker or the queue, so it is
more a workaround than a solution; but it avoids customizing activeMQ code, the
workaround is in our own product code.
> 'recovery' of messages does not take into account the PolicyMap/PolicyEntry
> 'memoryLimit', but takes 200 messages from the store
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-7394
> URL: https://issues.apache.org/jira/browse/AMQ-7394
> Project: ActiveMQ
> Issue Type: Bug
> Components: JDBC
> Affects Versions: 5.15.2
> Reporter: Reinald Verheij
> Priority: Major
>
> I believe this affects versions up to 5.15.11 too, since I don't see changes
> in the related code.
> At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.
> When a queue (the tables in the DB for the queue) has many messages of a
> significant size (e.g. 200 messages of 5 MB) and I start the brokerService
> then it will 'recover' the messages. The first call will get 400 messages and
> when consuming and removing those subsequent calls will get 200 messages each
> *without* taking into account the configured memory limits per-broker or
> per-queue. So memory consumption is rather unconstrained here.
> I believe this is because [JDBCMessageStore.java
> recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
> always returns 'true' (at line 368 currently) without asking
> {{listener.hasSpace()}} {code:java} @Override
> public boolean recoverMessage(long sequenceId, byte[] data)
> throws Exception {
> Message msg = (Message)wireFormat.unmarshal(new
> ByteSequence(data));
> msg.getMessageId().setBrokerSequenceId(sequenceId);
>
> msg.getMessageId().setFutureOrSequenceLong(sequenceId);
> msg.getMessageId().setEntryLocator(sequenceId);
> listener.recoverMessage(msg);
> trackLastRecovered(sequenceId, msg.getPriority());
> return true;
> }{code}
> ... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages
> never jumps into the else branch to abort loading/recovering messages (at
> line 1093 currently) {code:java} while (rs.next() && count <
> maxReturned) {
> if (listener.recoverMessage(rs.getLong(1),
> getBinaryData(rs, 2))) {
> count++;
> } else {
> LOG.debug("Stopped recover next messages");
> break;
> }
> }{code}
> I did a workaround by subclassing the TransactJDBCAdapter and then wrapping
> the listener to impose my own limit (however this is a separately configured
> limit; it is not coupled with the memory limits of the broker or the queue,
> so it is more a workaround than a solution; but it avoids customizing
> activeMQ code, the workaround is in our own product code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)