[ 
https://issues.apache.org/activemq/browse/AMQ-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Blackburn updated AMQ-2955:
---------------------------------

    Attachment: TrackedMessageDequeuer.java
                TrackedMessageEnqueuer.java

Attached test harness. 

Compile with following:

{noformat}
javac -cp 
.activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar:jms.jar
 *.java
{noformat}

When producing the error, we restart the activemq server and wait until we see 
the MessageDatabase "Checkpoint done" message in the log, then kick off the 
enquer as follows:

{noformat}
java -cp 
.:jms.jar:activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar
 jms.TrackedMessageEnqueuer 'tcp://localhost:61616' 1 10 0
{noformat}

and then immediately kick off the dequeuer as follows:

{noformat}
java -cp 
.:jms.jar:lib/activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:og4j-1.2.15.jar:commons-logging.jar
 jms.TrackedMessageDequeuer 'tcp://localhost:61616'
{noformat}

We don't always get the error, but when it occurs it is always on the 45th 
message sent, using the 10K message size as shown.

> Message getting stuck on queue, leading to KahaDB log files not being deleted 
> and disk running out of space
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2955
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2955
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.4.1
>         Environment: Red Hat Enterprise Linux 5
>            Reporter: Peter Blackburn
>            Priority: Critical
>         Attachments: TrackedMessageDequeuer.java, TrackedMessageEnqueuer.java
>
>
> Using the following test client, we see a single message getting stuck on the 
> queue. This then prevents the KahaDB files from being cleaned up. Once this 
> message gets stuck, we need to restart the broker before it can be consumed. 
> This is a total show stopper for us, as when this occurs in our system the 
> large number of messages that we produce and consume each second causes the 
> disk to run out of space within the space of an hour. We also see the same 
> behaviour using synchronous sending and without failover.
> This doesn't happen every time with the test client - the most reliable way I 
> have found to reproduce it is to start the broker and wait for the first 
> MessageDatabase checkpoint to finish before starting the test client. 
> {code:title=Test Client}
> import java.io.BufferedWriter;
> import java.io.FileWriter;
> import java.util.Random;
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class Test {
>         public static final void main(String[] args) throws Exception {
>                 ConnectionFactory cf = new 
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
>                 final Connection producerConn = cf.createConnection();
>                 final Connection consumerConn = cf.createConnection();
>                 final BufferedWriter producerLog = new BufferedWriter(new 
> FileWriter("produced.log"));
>                 final BufferedWriter consumerLog = new BufferedWriter(new 
> FileWriter("consumed.log"));
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         producerConn.start();
>                                         Session session = 
> producerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = 
> session.createQueue("TEST_QUEUE");
>                                         MessageProducer producer = 
> session.createProducer(queue);
>                                         Random random = new Random();
>                                         byte[] messageBytes = new byte[1024];
>                                         for (int i = 0; i < 100000; i++) {
>                                         //while (true) {
>                                                 
> random.nextBytes(messageBytes);
>                                                 Message message = 
> session.createObjectMessage(messageBytes);
>                                                 producer.send(message);
>                                                 
> producerLog.write(message.getJMSMessageID());
>                                                 producerLog.newLine();
>                                                 producerLog.flush();
>                                         }
>                                         System.out.println("Produced 100000 
> messages...");
>                                         producerLog.close();
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started producer...");
>                 new Thread(new Runnable() {
>                         public void run() {
>                                 try {
>                                         consumerConn.start();
>                                         Session session = 
> consumerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>                                         Queue queue = 
> session.createQueue("TEST_QUEUE");
>                                         MessageConsumer consumer = 
> session.createConsumer(queue);
>                                         consumer.setMessageListener(new 
> MessageListener() {
>                                                 public void onMessage(Message 
> message) {
>                                                         try {
>                                                                 
> message.acknowledge();
>                                                                 
> consumerLog.write(message.getJMSMessageID());
>                                                                 
> consumerLog.newLine();
>                                                                 
> consumerLog.flush();
>                                                         }
>                                                         catch (Exception e) {
>                                                                 
> e.printStackTrace();
>                                                         }
>                                                 }
>                                         });
>                                 }
>                                 catch (Exception e) {
>                                         e.printStackTrace();
>                                 }
>                         }
>                 }).start();
>                 System.out.println("Started consumer...");
>         }
> }
> {code}
> After the 100,000 messages have been produced, we can see the following 
> difference in the log files:
> {noformat}
> [pblackb...@xxxx test]$ diff produced.log consumed.log
> 10394d10393
> < ID:xxxx-35451-1285948546531-0:0:1:1:10394
> [pblackb...@xxxx test]$
> {noformat}
> Looking in the activemq log file, at around this point we see:
> {noformat}
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 349, enqueueSize: 10390
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 350, enqueueSize: 10391
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 351, enqueueSize: 10392
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, 
> pagedInMessages.size 352, enqueueSize: 10393
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on 
> size:0, lastCachedIdSeq: 10398 current node seqId: 10399
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353, 
> pagedInMessages.size 353, enqueueSize: 10395
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: 
> usage change from: 69% of available memory, to: 70% of available memory
> {noformat}
> At the end of the log file, where we have a single message stuck on the 
> queue, we see:
> {noformat}
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, 
> pagedInMessages.size 0, enqueueSize: 100000
> {noformat}
> We can see the checkpoint failing to clean up the log files:
> {noformat}
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as 
> contained ack(s) refer to referenced file: [1, 2]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as 
> contained ack(s) refer to referenced file: [2, 3]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as 
> contained ack(s) refer to referenced file: [3, 4]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as 
> contained ack(s) refer to referenced file: [4, 5]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
> {noformat}
> At this point our consumer had consumed all of the messages except the single 
> stuck message.
> We are using a clean out of the box set up - we have made no changes to the 
> default activemq.xml file,

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to