I found a bug in the C++ broker's journal recovery code in the
persistent message store library from qpidcomponents.org. I'm not sure
if this is the correct place to post this, but I couldn't find the
message store component in either jira or redhat's bugzilla. If anyone
knows where to log this as a bug please just let me know where and what
component it belongs under and I'll log a bug.

DESCRIPTION:
It is possible to get a queue into a state where it is working as
expected while qpidd is running, but if qpidd is shutdown, attempting to
restart qpidd will encounter an journal full error while recovering the
queue from the persistent message store.

REPRODUCING:
1) launch qpidd with 4 journal files of default size (24 pages of 64KB
each)
2) create a persistent queue and send messages until an "Enqueue
capacity threshold" error is generated. The messages must be sized such
that a) the first journal file contains more than a single message and
b) when the enque capacity is hit, all available space in the first 3
journal files is used.
3) retrieve and accept the first message.
4) shutdown the broker
5) when attempting to restart the broker, a JERR_JCNTL_RECOVERJFULL
error is generated and the broker exits.

Expected operation:
broker starts and recovers stored messages normally in step 5.

I'm attaching a reproducer which does steps 2 & 3 using messages sized
to use 1/2 of a journal file. With messages of that size, the bug is
only triggered by deleting a single message; deleting more than one or
no messages at all allows the broker to be restarted successfully.

Note that this is always retrieving messages and deleting them in the
order that they were inserted. The database can only be properly
recovered if either no messages are deleted or all messages residing in
the first journal file are deleted before shutting down the broker once
the enqueue capacity exactly matches the threshold.

ANALYSIS:

I've investigated this some and the cause is that the enqueue capacity
is not enforced against delete operations (since they are actually
freeing space), but this distinction is not made when restarting.
Thus, delete operations in the journal that are beyond the enqueue
capacity trigger the journal full error and makes the store
unrecoverable. This is because the journal is considered full if the
last file id is directly before the first file id and the first file
isn't empty.

It should also be possible to encounter this error without ever hitting
the enqueue capacity if enough deletions are performed to move the write
pointer into the last file and the number of deletions isn't enough to
free the first file. For example, if N enqueues of 1 byte messages can
be performed before hitting the threshold, enqueueing N-1 messages then
deleting the first 5 messages should put the journal into an
unrecoverable state.

I have come up with both a workaround and a proposed patch:

WORKAROUND:
The workaround is to never use less than 7 journal files. With 7 journal
files, the 20% reserved space is equal to 1.4 journal files. This gives
us 40% of a journal file that can be filled with delete operations
before anything is stored in the last journal file. Because deletions
only take 1 DBLK, in the worst case (all messages only use 1 SBLK) we
can fully delete all messages from the first journal file using 25% of a
journal file. So enough deletes can always fit into the last 40% of the
second to last journal file. By the time we get to the last journal
file, the first file has been completely emptied.

PROPOSED FIX:
The proposed patch is a one-liner that modifies the journal full
calculation as described in the following psuedo-code:

journal full = original full calculation && (something enqueued in last
file || last file is full)

Explanation:
No messages should ever be enqueued in the last file since the enqueue
capacity check should prevent that, so this is basically a sanity check.
Otherwise, as long as there is still space in the last file, the journal
is not full: the last file only contains deletes or transactions and
there is still space for more deletes or transactions. In this case the
journal can be safely recovered, and once more messages are deleted the
enqueue capacity can drop below the threshold allowing enqueues to
resume.

#include <qpid/client/Connection.h>
#include <qpid/console/ConsoleListener.h>
#include <qpid/console/SessionManager.h>
#include <qpid/client/Message.h>
#include <qpid/client/Session.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Selector.h>
#
using namespace std;
using namespace qpid;
using namespace client;
using namespace log;

// the journal file will be unrecoverable on startup if one message is accepted
// accepting more than one or zero will allow proper startup recovery for the message
// and journal sizes used here
int ACCEPT_COUNT = 1;


const string brokerHost = "localhost";
const int brokerPort = 5672;

const string testQueue = "test-queue";
const string testKey = "test-key";
const string testExchange = "amq.topic";

bool createAndBindQueue(Session* session, const string& exchange,
                              const string& queue, const string& key)
{
    qpid::framing::QueueQueryResult qqr = session->queueQuery(queue);
    if(!qqr.hasQueue())
    {
        session->queueDeclare(arg::queue = queue, arg::durable = true, arg::autoDelete = false);
        qqr = session->queueQuery(queue);
        if(!qqr.hasQueue())
        {
            std::cerr << "Error creating queue: " << queue << "." << std::endl;
            return false;
        }
    }
    
    session->exchangeBind(arg::exchange   = exchange,
                          arg::queue      = queue,
                          arg::bindingKey = key);
    return true;
}


int main(int argc, char** argv) {
		LocalQueue lq;
		Subscription sub;
		SubscriptionManager* subMgr = NULL;
		Connection* conn;
		Session session;
		SubscriptionSettings subSettings;
    Message msg;
    string data;
    int dataSize = 24 * 64 * 1024 / 2 - 134; // 1/2 of a journal file
    
    for (int i = 0; i < dataSize; i++) {
      data.append("s");
    }

    conn = new Connection();
    conn->open(brokerHost, brokerPort);
    session = conn->newSession();

    
    createAndBindQueue(&session,testExchange, testQueue, testKey);

    while (1) {
        msg.getDeliveryProperties().setRoutingKey(testKey);
        msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
        msg.setData(data);
        
        try {
          session.messageTransfer(arg::destination = testExchange, arg::content = msg);
          cout << "Sent message: size = " << data.size() << endl;
        } catch(exception& e) {
        	session.close();
        	conn->close();
        	break;
        }
    }

		conn->open(brokerHost, brokerPort);
		session = conn->newSession();
		subMgr = new SubscriptionManager(session);
		
    subSettings.acceptMode = ACCEPT_MODE_EXPLICIT;
    createAndBindQueue(&session,testExchange, testQueue, testKey);
		sub = subMgr->subscribe(lq, testQueue, subSettings);
		
		for (int i = 0; i < ACCEPT_COUNT; i++) {
			while (!lq.get(msg)) ;
			sub.accept(msg.getId());
			cout << "accepted a message\n";
		}
		
    session.close();
    conn->close();
    delete conn;
    return 0;
}
diff -ur qpid-bdb-store.orig/lib/jrnl/jcntl.cpp qpid-bdb-store//lib/jrnl/jcntl.cpp
--- qpid-bdb-store.orig/lib/jrnl/jcntl.cpp	2012-03-02 13:07:00.619799497 -0500
+++ qpid-bdb-store//lib/jrnl/jcntl.cpp	2012-03-02 13:09:06.410860661 -0500
@@ -640,7 +640,7 @@
 
         // Check for journal full condition
         u_int16_t next_wr_fid = (rd._lfid + 1) % rd._njf;
-        rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid];
+        rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid] && (rd._enq_cnt_list[rd._lfid] || rd._lffull);
     }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to