Reinald Verheij created AMQ-7394:
------------------------------------
Summary: 'recovery' of messages does not take into account the
PolicyMap/PolicyEntry 'memoryLimit', but takes 200 messages from the store
Key: AMQ-7394
URL: https://issues.apache.org/jira/browse/AMQ-7394
Project: ActiveMQ
Issue Type: Bug
Components: JDBC
Affects Versions: 5.15.2
Reporter: Reinald Verheij
I believe this affects versions up to 5.15.11 too, since I don't see changes in
the related code.
At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.
When a queue (the tables in the DB for the queue) has many messages of a
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then
it will 'recover' the messages. The first call will get 400 messages and when
consuming and removing those subsequent calls will get 200 messages each
*without* taking into account the configured memory limits per-broker or
per-queue. So memory consumption can use memory in a rather uncontrollable way
here.
I believe this is because [JDBCMessageStore.java
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
always returns 'true' (at line 368 currently) without asking
{{listener.hasSpace()}} {code:java} @Override
public boolean recoverMessage(long sequenceId, byte[] data)
throws Exception {
Message msg = (Message)wireFormat.unmarshal(new
ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
msg.getMessageId().setEntryLocator(sequenceId);
listener.recoverMessage(msg);
trackLastRecovered(sequenceId, msg.getPriority());
return true;
}{code}
... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages
never jumps into the else branch to abort loading/recovering messages (at line
1093 currently) {code:java} while (rs.next() && count <
maxReturned) {
if (listener.recoverMessage(rs.getLong(1),
getBinaryData(rs, 2))) {
count++;
} else {
LOG.debug("Stopped recover next messages");
break;
}
}{code}
I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the
listener to impose my own limit (however this is a separately configured limit;
it is not coupled with the memory limits of the broker or the queue, so it is
more a workaround than a solution; but it avoids customizing activeMQ code, the
workaround is in our own product code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)