[ 
https://issues.apache.org/activemq/browse/AMQ-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=62499#action_62499
 ] 

Peter Blackburn commented on AMQ-2955:
--------------------------------------

With a different test harness, we can get this failure to occur at the same 
point every time (for 1K message size always happens at  the 286th message 
sent, for 10K message size  at the 45th message sent nd for 100K message size 
at the 5th message sent).

Further investigation shows that when this occurs, the following conditions 
hold:

In class {{Queue}}, method {{doPageIn}}, the following loop body does not get 
executed as {{messages.hasNext()}} is returning false:

{code}
while (messages.hasNext() && count < toPageIn) {
    MessageReference node = messages.next();
    messages.remove();
// snipped for brevity
{code}

At this point, {{count=0}}, {{toPageIn=1}} and {{messages.size()=1}}.

Following the code through to the {{BTreeNode}} class, we find that the leaf 
node contains a single key with value 44.  When the {{BTreeNode.BTreeIterator}} 
class is instantiated, it is being passed in a value of 535 for the value of 
the default cursor position as the {{batchResetNeeded}} flag is false. This 
causes the loop body in the {{findNextPage}} method to exit before it sets the 
{{nextEntry}} field, leaving it null.

If we stick a quick hack into the {{doPageIn}} method in class {{Queue}} then 
the problem seems to go away, but we still don't know what the underlying cause 
was and we are wary of changing code that we don't fully understand:

{code}
if (messages.size() > 0 && !messages.hasNext()) {
    store.resetBatching();
}

while (messages.hasNext() && count < toPageIn) {
    MessageReference node = messages.next();
    messages.remove();
// snipped for brevity
{code}





> Message getting stuck on queue, leading to KahaDB log files not being deleted 
> and disk running out of space
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2955
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2955
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.4.1
>         Environment: Red Hat Enterprise Linux 5
>            Reporter: Peter Blackburn
>            Priority: Critical
>
> Using the following test client, we see a single message getting stuck on the 
> queue. This then prevents the KahaDB files from being cleaned up. Once this 
> message gets stuck, we need to restart the broker before it can be consumed. 
> This is a total show stopper for us, as when this occurs in our system the 
> large number of messages that we produce and consume each second causes the 
> disk to run out of space within the space of an hour. We also see the same 
> behaviour using synchronous sending and without failover.
> This doesn't happen every time with the test client - the most reliable way I 
> have found to reproduce it is to start the broker and wait for the first 
> MessageDatabase checkpoint to finish before starting the test client. 
> {code:title=Test Client}
> import java.io.BufferedWriter;
> import java.io.FileWriter;
> import java.util.Random;
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class Test {
>         public static final void main(String[] args) throws Exception {
>                 ConnectionFactory cf = new 
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
>                 final Connection producerConn = cf.createConnection();
>                 final Connection consumerConn = cf.createConnection();
>                 final BufferedWriter producerLog = new BufferedWriter(new 
> FileWriter("produced.log"));
>                 final BufferedWriter consumerLog = new BufferedWriter(new 
> FileWriter("consumed.log"));
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         producerConn.start();
>                                         Session session = 
> producerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = 
> session.createQueue("TEST_QUEUE");
>                                         MessageProducer producer = 
> session.createProducer(queue);
>                                         Random random = new Random();
>                                         byte[] messageBytes = new byte[1024];
>                                         for (int i = 0; i < 100000; i++) {
>                                         //while (true) {
>                                                 
> random.nextBytes(messageBytes);
>                                                 Message message = 
> session.createObjectMessage(messageBytes);
>                                                 producer.send(message);
>                                                 
> producerLog.write(message.getJMSMessageID());
>                                                 producerLog.newLine();
>                                                 producerLog.flush();
>                                         }
>                                         System.out.println("Produced 100000 
> messages...");
>                                         producerLog.close();
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started producer...");
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         consumerConn.start();
>                                         Session session = 
> consumerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = 
> session.createQueue("TEST_QUEUE");
>                                         MessageConsumer consumer = 
> session.createConsumer(queue);
>                                         consumer.setMessageListener(new 
> MessageListener() {
>                                                 public void onMessage(Message 
> message) {
>                                                         try {
>                                                                 
> message.acknowledge();
>                                                                 
> consumerLog.write(message.getJMSMessageID());
>                                                                 
> consumerLog.newLine();
>                                                                 
> consumerLog.flush();
>                                                         }
>                                                         catch (Exception e) {
>                                                                 
> e.printStackTrace();
>                                                         }
>                                                 }
>                                         });
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started consumer...");
>         }
> }
> {code}
> After the 100,000 messages have been produced, we can see the following 
> difference in the log files:
> {noformat}
> [pblackb...@xxxx test]$ diff produced.log consumed.log
> 10394d10393
> < ID:xxxx-35451-1285948546531-0:0:1:1:10394
> [pblackb...@xxxx test]$
> {noformat}
> Looking in the activemq log file, at around this point we see:
> {noformat}
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 349, enqueueSize: 10390
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 350, enqueueSize: 10391
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 351, enqueueSize: 10392
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 352, enqueueSize: 10393
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on 
> size:0, lastCachedIdSeq: 10398 current node seqId: 10399
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353, 
> pagedInMessages.size 353, enqueueSize: 10395
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> {noformat}
> At the end of the log file, where we have a single message stuck on the 
> queue, we see:
> {noformat}
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> {noformat}
> We can see the checkpoint failing to clean up the log files:
> {noformat}
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as 
> contained ack(s) refer to referenced file: [1, 2]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as 
> contained ack(s) refer to referenced file: [2, 3]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as 
> contained ack(s) refer to referenced file: [3, 4]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as 
> contained ack(s) refer to referenced file: [4, 5]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
> {noformat}
> At this point our consumer had consumed all of the messages except the single 
> stuck message.
> We are using a clean out of the box set up - we have made no changes to the 
> default activemq.xml file,

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to