[ 
https://issues.apache.org/activemq/browse/AMQ-1918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=46016#action_46016
 ] 

Nicusor Tanase commented on AMQ-1918:
-------------------------------------

I found a way to work around this issue, by changing the way messages are 
loaded from the database.
I ran tests with several queues, producers and consumers and did not get any 
undelivered messages anymore.

DefaultJDBCAdapter.doRecoverNextMessages() recovers the messages with ID higher 
then the last recovered messages.
The SQL statement is:
{code:title=org.apache.activemq.store.jdbc.Statements.java|borderStyle=solid}
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                        + " WHERE CONTAINER=? AND ID > ? ORDER 
BY ID";
{code}
However, it can happen that messages with lower id are inserted into the DB 
after messages with higher IDs. 
Such messages do not get recovered from DB.

I have changed on my local copy the DefaultJDBCAdapter to act retroactive, 
looking back {{maxReturned}} rows for any missed messages.
Anyway, I am not familiar with ActiveMQ code, so you might want to have a look 
at the modified DefaultJDBCAdapter.doRecoverNextMessages() bellow:

{code:title=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.java|borderStyle=solid}
   public class DefaultJDBCAdapter implements JDBCAdapter {

   private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
   -------------------------------------------------------

    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination 
destination, long nextSeq,
                                      int maxReturned, 
JDBCMessageRecoveryListener listener) throws Exception {
        PreparedStatement s = null;
        ResultSet rs = null;
        long id = 0;
        List<Long> cleanupIds = new ArrayList<Long>();
        int index = 0;
        try {
            s = 
c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
            s.setMaxRows(maxReturned*2);
            s.setString(1, destination.getQualifiedName());
            s.setLong(2, nextSeq - maxReturned);
            rs = s.executeQuery();
            int count = 0;
            if (statements.isUseExternalMessageReferences()) {
                while (rs.next() && count < maxReturned) {
                        id = rs.getLong(1);
                        if ( lastRecoveredMessagesIds.contains(id) ) {
                                // this message was already recovered
                                cleanupIds.add(id);
                                continue;
                        }                       
                    if (listener.recoverMessageReference(rs.getString(1))) {
                        count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            } else {
                while (rs.next() && count < maxReturned) {
                        id = rs.getLong(1);
                        if ( lastRecoveredMessagesIds.contains(id) ) {
                                // this message was already recovered
                                cleanupIds.add(id);
                                continue;
                        }
                    if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                        count++;
                        lastRecoveredMessagesIds.add(id);
                    } else {
                        LOG.debug("Stopped recover next messages");
                    }
                }
            }
            
            //not cleanup the list of recovered messages
            index = 0;
            Iterator<Long> it = cleanupIds.iterator();
            while (it.hasNext() && index < count) {
                lastRecoveredMessagesIds.remove(it.next());
            }
            
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close(rs);
            close(s);
        }
    }

}
{code}




> AbstractStoreCursor.size gets out of synch with Store size and blocks 
> consumers
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-1918
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1918
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.1.0
>            Reporter: Richard Yarger
>            Assignee: Rob Davies
>            Priority: Critical
>             Fix For: 5.3.0
>
>         Attachments: activemq.xml, testAMQMessageStore.zip, testdata.zip
>
>
> In version 5.1.0, we are seeing our queue consumers stop consuming for no 
> reason.
> We have a staged queue environment and we occasionally see one queue display 
> negative pending message counts that hang around -x, rise to -x+n gradually 
> and then fall back to -x abruptly. The messages are building up and being 
> processed in bunches but its not easy to see because the counts are negative. 
> We see this behavior in the messages coming out of the system. Outbound 
> messages come out in bunches and are synchronized with the queue pending 
> count dropping to -x.
> This issue does not happen ALL of the time. It happens about once a week and 
> the only way to fix it is to bounce the broker. It doesn't happen to the same 
> queue everytime, so it is not our consuming code.
> Although we don't have a reproducible scenario, we have been able to debug 
> the issue in our test environment.
> We traced the problem to the cached store size in the AbstractStoreCursor.
> This value becomes 0 or negative and prevents the AbstractStoreCursor from 
> retrieving more messages from the store. (see AbstractStoreCursor.fillBatch() 
> )
> We have seen size value go lower than -1000.
> We have also forced it to fix itself by sending in n+1 messages. Once the 
> size goes above zero, the cached value is refreshed and things work ok again.
> Unfortunately, during low volume times, it could be hours before n+1 messages 
> are received, so our message latency can rise during low volume times.... :(
> I have attached our broker config.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to