Reinald Verheij created AMQ-7394:
------------------------------------

             Summary: 'recovery' of messages does not take into account the 
PolicyMap/PolicyEntry 'memoryLimit', but takes 200 messages from the store
                 Key: AMQ-7394
                 URL: https://issues.apache.org/jira/browse/AMQ-7394
             Project: ActiveMQ
          Issue Type: Bug
          Components: JDBC
    Affects Versions: 5.15.2
            Reporter: Reinald Verheij


I believe this affects versions up to 5.15.11 too, since I don't see changes in 
the related code.

At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.

When a queue (the tables in the DB for the queue) has many messages of a 
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then 
it will 'recover' the messages. The first call will get 400 messages and when 
consuming and removing those subsequent calls will get 200 messages each 
*without* taking into account the configured memory limits per-broker or 
per-queue. So memory consumption can use memory in a rather uncontrollable way 
here.

I believe this is because [JDBCMessageStore.java 
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
 always returns 'true' (at line 368 currently) without asking 
{{listener.hasSpace()}} {code:java}                @Override
                public boolean recoverMessage(long sequenceId, byte[] data) 
throws Exception {
                        Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
                        msg.getMessageId().setBrokerSequenceId(sequenceId);
                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                        msg.getMessageId().setEntryLocator(sequenceId);
                        listener.recoverMessage(msg);
                        trackLastRecovered(sequenceId, msg.getPriority());
                        return true;
                }{code}

... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages 
never jumps into the else branch to abort loading/recovering messages (at line 
1093 currently) {code:java}                while (rs.next() && count < 
maxReturned) {
                    if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                        count++;
                    } else {
                        LOG.debug("Stopped recover next messages");
                        break;
                    }
                }{code}

I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the 
listener to impose my own limit (however this is a separately configured limit; 
it is not coupled with the memory limits of the broker or the queue, so it is 
more a workaround than a solution; but it avoids customizing activeMQ code, the 
workaround is in our own product code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to