[ 
https://issues.apache.org/jira/browse/AMQ-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reinald Verheij updated AMQ-7394:
---------------------------------
    Description: 
I believe this affects versions up to 5.15.11 too, since I don't see changes in 
the related code.

At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.

When a queue (the tables in the DB for the queue) has many messages of a 
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then 
it will 'recover' the messages. The first call will get 400 messages and when 
consuming and removing those subsequent calls will get 200 messages each 
*without* taking into account the configured memory limits per-broker or 
per-queue. So memory consumption is rather unconstrained here.

I believe this is because [JDBCMessageStore.java 
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
 always returns 'true' (at line 368 currently) without asking 
{{listener.hasSpace()}} {code:java}                @Override
                public boolean recoverMessage(long sequenceId, byte[] data) 
throws Exception {
                        Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
                        msg.getMessageId().setBrokerSequenceId(sequenceId);
                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                        msg.getMessageId().setEntryLocator(sequenceId);
                        listener.recoverMessage(msg);
                        trackLastRecovered(sequenceId, msg.getPriority());
                        return true;
                }{code}

... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages 
never jumps into the else branch to abort loading/recovering messages (at line 
1093 currently) {code:java}                while (rs.next() && count < 
maxReturned) {
                    if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                        count++;
                    } else {
                        LOG.debug("Stopped recover next messages");
                        break;
                    }
                }{code}

I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the 
listener to impose my own limit (however this is a separately configured limit; 
it is not coupled with the memory limits of the broker or the queue, so it is 
more a workaround than a solution; but it avoids customizing activeMQ code, the 
workaround is in our own product code.

  was:
I believe this affects versions up to 5.15.11 too, since I don't see changes in 
the related code.

At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.

When a queue (the tables in the DB for the queue) has many messages of a 
significant size (e.g. 200 messages of 5 MB) and I start the brokerService then 
it will 'recover' the messages. The first call will get 400 messages and when 
consuming and removing those subsequent calls will get 200 messages each 
*without* taking into account the configured memory limits per-broker or 
per-queue. So memory consumption can use memory in a rather uncontrollable way 
here.

I believe this is because [JDBCMessageStore.java 
recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
 always returns 'true' (at line 368 currently) without asking 
{{listener.hasSpace()}} {code:java}                @Override
                public boolean recoverMessage(long sequenceId, byte[] data) 
throws Exception {
                        Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
                        msg.getMessageId().setBrokerSequenceId(sequenceId);
                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                        msg.getMessageId().setEntryLocator(sequenceId);
                        listener.recoverMessage(msg);
                        trackLastRecovered(sequenceId, msg.getPriority());
                        return true;
                }{code}

... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages 
never jumps into the else branch to abort loading/recovering messages (at line 
1093 currently) {code:java}                while (rs.next() && count < 
maxReturned) {
                    if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                        count++;
                    } else {
                        LOG.debug("Stopped recover next messages");
                        break;
                    }
                }{code}

I did a workaround by subclassing the TransactJDBCAdapter and then wrapping the 
listener to impose my own limit (however this is a separately configured limit; 
it is not coupled with the memory limits of the broker or the queue, so it is 
more a workaround than a solution; but it avoids customizing activeMQ code, the 
workaround is in our own product code.


> 'recovery' of messages does not take into account the PolicyMap/PolicyEntry 
> 'memoryLimit', but takes 200 messages from the store
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-7394
>                 URL: https://issues.apache.org/jira/browse/AMQ-7394
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JDBC
>    Affects Versions: 5.15.2
>            Reporter: Reinald Verheij
>            Priority: Major
>
> I believe this affects versions up to 5.15.11 too, since I don't see changes 
> in the related code.
> At Infor we use activeMQ 5.15.2 currently with the JDBC Persistence Adapter.
> When a queue (the tables in the DB for the queue) has many messages of a 
> significant size (e.g. 200 messages of 5 MB) and I start the brokerService 
> then it will 'recover' the messages. The first call will get 400 messages and 
> when consuming and removing those subsequent calls will get 200 messages each 
> *without* taking into account the configured memory limits per-broker or 
> per-queue. So memory consumption is rather unconstrained here.
> I believe this is because [JDBCMessageStore.java 
> recoverNextMessages|https://github.com/apache/activemq/blob/master/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java]
>  always returns 'true' (at line 368 currently) without asking 
> {{listener.hasSpace()}} {code:java}                @Override
>                 public boolean recoverMessage(long sequenceId, byte[] data) 
> throws Exception {
>                         Message msg = (Message)wireFormat.unmarshal(new 
> ByteSequence(data));
>                         msg.getMessageId().setBrokerSequenceId(sequenceId);
>                         
> msg.getMessageId().setFutureOrSequenceLong(sequenceId);
>                         msg.getMessageId().setEntryLocator(sequenceId);
>                         listener.recoverMessage(msg);
>                         trackLastRecovered(sequenceId, msg.getPriority());
>                         return true;
>                 }{code}
> ... so the resultset iterator in DefaultJDBCAdapter::doRecoverNextMessages 
> never jumps into the else branch to abort loading/recovering messages (at 
> line 1093 currently) {code:java}                while (rs.next() && count < 
> maxReturned) {
>                     if (listener.recoverMessage(rs.getLong(1), 
> getBinaryData(rs, 2))) {
>                         count++;
>                     } else {
>                         LOG.debug("Stopped recover next messages");
>                         break;
>                     }
>                 }{code}
> I did a workaround by subclassing the TransactJDBCAdapter and then wrapping 
> the listener to impose my own limit (however this is a separately configured 
> limit; it is not coupled with the memory limits of the broker or the queue, 
> so it is more a workaround than a solution; but it avoids customizing 
> activeMQ code, the workaround is in our own product code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to