[
https://issues.apache.org/activemq/browse/AMQ-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Blackburn updated AMQ-2955:
---------------------------------
Attachment: activemq.xml
Attached {{activemq.xml}} file used.
> Message getting stuck on queue, leading to KahaDB log files not being deleted
> and disk running out of space
> -----------------------------------------------------------------------------------------------------------
>
> Key: AMQ-2955
> URL: https://issues.apache.org/activemq/browse/AMQ-2955
> Project: ActiveMQ
> Issue Type: Bug
> Components: Message Store
> Affects Versions: 5.4.1
> Environment: Red Hat Enterprise Linux 5
> Reporter: Peter Blackburn
> Priority: Critical
> Attachments: activemq.xml, TrackedMessageDequeuer.java,
> TrackedMessageEnqueuer.java
>
>
> Using the following test client, we see a single message getting stuck on the
> queue. This then prevents the KahaDB files from being cleaned up. Once this
> message gets stuck, we need to restart the broker before it can be consumed.
> This is a total show stopper for us, as when this occurs in our system the
> large number of messages that we produce and consume each second causes the
> disk to run out of space within the space of an hour. We also see the same
> behaviour using synchronous sending and without failover.
> This doesn't happen every time with the test client - the most reliable way I
> have found to reproduce it is to start the broker and wait for the first
> MessageDatabase checkpoint to finish before starting the test client.
> {code:title=Test Client}
> import java.io.BufferedWriter;
> import java.io.FileWriter;
> import java.util.Random;
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class Test {
> public static final void main(String[] args) throws Exception {
> ConnectionFactory cf = new
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
> final Connection producerConn = cf.createConnection();
> final Connection consumerConn = cf.createConnection();
> final BufferedWriter producerLog = new BufferedWriter(new
> FileWriter("produced.log"));
> final BufferedWriter consumerLog = new BufferedWriter(new
> FileWriter("consumed.log"));
> new Thread(new Runnable() {
> public void run() {
> try {
> producerConn.start();
> Session session =
> producerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
> Queue queue =
> session.createQueue("TEST_QUEUE");
> MessageProducer producer =
> session.createProducer(queue);
> Random random = new Random();
> byte[] messageBytes = new byte[1024];
> for (int i = 0; i < 100000; i++) {
> //while (true) {
>
> random.nextBytes(messageBytes);
> Message message =
> session.createObjectMessage(messageBytes);
> producer.send(message);
>
> producerLog.write(message.getJMSMessageID());
> producerLog.newLine();
> producerLog.flush();
> }
> System.out.println("Produced 100000
> messages...");
> producerLog.close();
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
> }).start();
> System.out.println("Started producer...");
> new Thread(new Runnable() {
> public void run() {
> try {
> consumerConn.start();
> Session session =
> consumerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
> Queue queue =
> session.createQueue("TEST_QUEUE");
> MessageConsumer consumer =
> session.createConsumer(queue);
> consumer.setMessageListener(new
> MessageListener() {
> public void onMessage(Message
> message) {
> try {
>
> message.acknowledge();
>
> consumerLog.write(message.getJMSMessageID());
>
> consumerLog.newLine();
>
> consumerLog.flush();
> }
> catch (Exception e) {
>
> e.printStackTrace();
> }
> }
> });
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
> }).start();
> System.out.println("Started consumer...");
> }
> }
> {code}
> After the 100,000 messages have been produced, we can see the following
> difference in the log files:
> {noformat}
> [pblackb...@xxxx test]$ diff produced.log consumed.log
> 10394d10393
> < ID:xxxx-35451-1285948546531-0:0:1:1:10394
> [pblackb...@xxxx test]$
> {noformat}
> Looking in the activemq log file, at around this point we see:
> {noformat}
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205,
> pagedInMessages.size 349, enqueueSize: 10390
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205,
> pagedInMessages.size 350, enqueueSize: 10391
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205,
> pagedInMessages.size 351, enqueueSize: 10392
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205,
> pagedInMessages.size 352, enqueueSize: 10393
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory:
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory:
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on
> size:0, lastCachedIdSeq: 10398 current node seqId: 10399
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory:
> usage change from: 69% of available memory, to: 70% of available memory
> 2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353,
> pagedInMessages.size 353, enqueueSize: 10395
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory:
> usage change from: 70% of available memory, to: 69% of available memory
> 2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory:
> usage change from: 69% of available memory, to: 70% of available memory
> {noformat}
> At the end of the log file, where we have a single message stuck on the
> queue, we see:
> {noformat}
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> 2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, enqueueSize: 100000
> {noformat}
> We can see the checkpoint failing to clean up the log files:
> {noformat}
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as
> contained ack(s) refer to referenced file: [1, 2]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as
> contained ack(s) refer to referenced file: [2, 3]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as
> contained ack(s) refer to referenced file: [3, 4]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as
> contained ack(s) refer to referenced file: [4, 5]
> 2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
> {noformat}
> At this point our consumer had consumed all of the messages except the single
> stuck message.
> We are using a clean out of the box set up - we have made no changes to the
> default activemq.xml file,
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.