On Tue, Apr 08, 2008 at 10:20:57AM -0400, Ken Murchison wrote:
> I just put together a release candidate for Cyrus 2.3.12.  I'd
> appreciate any independent testing before I release this to the masses.
>
> http://www.contrib.andrew.cmu.edu/~murch/cyrus-imapd-2.3.12rc1.tar.gz

Notably missing:
http://cyrus.brong.fastmail.fm/patches/cyrus-skiplist-transactions-2.3.10.diff

This is the code that makes it impossible to accidentally start a new
transaction while still in a transaction on the same file.

I've re-rolled it to apply directly on top of 2.3.12rc1 so it should
apply cleanly to CVS as well...

The "already open" stuff is necessary, because otherwise you wind up
with two different sets of information about which locks you are
holding - but at least on linux, if you fcntl lock a file and then
relock the same range of the same file from the same process, the
second lock overrides the first one, so a read lock (or unlock) from
one "database" could nuke the write lock held by the other copy.

It appears that there are two different copies of the seen database
still being opened somewhere in the code.  I haven't tracked that
down yet...

I am quite happy however to remove the syslog statement so we don't
confuse people.  The multiple open handing hasn't caused any issues
here at FastMail, and we've been running this patch for months now.

----

in other things - I haven't tested the rest of the code yet - I
just thought I'd double-triple-check for this one, since we started
seeing corruptions on 2.3.11 when I assumed it had been included
without checking!

Bron.
Skiplist Transaction Sanity Checks

<b>Candidate for Upstream</b> (at the very least the
current_txn and auto-promiting un-transactioned reads
is a good sanity check)

The Cyrus skiplist library was written by a very trusting
person who believed that nobody would ever fail to pass a
transaction object during an ongoing transaction.

Unfortunately, some of FastMail's patches call functions
which do a lookup on the mailboxes.db without knowing that
they are midway through a transaction.  This causes the
write lock to be degraded to a read lock and then no lock
at all, all without the skiplist code noticing that it was
supposed to be holding a lock!

While the best long-term solution would be a rewrite with
a serious eye to ensuring consistency of lock usage and
some more sanity checking, there is a simple solution which
will at least capture any mistakes.  I consider them serious
enough that an "assert" is fine.

fcntl locking appears to be be per-process rather than per
fd in our testing, so having multiple database objects pointing
at the same file is also bogus as they could interfere with
each others' locks.  This patch ensures that doesn't happen
with a linked list of "open skiplist databases" and a refcount.

To ensure consistency, the database struct has an additional
member "current_txn" of type "struct txn" and the following logic.

* If any operation is called with a transaction, current_txn
  MUST be set and must equal the passed transaction.

* If any _write_ operation (store, delete) is called without
  a transaction (or with a blank transaction), current_txn 
  MUST be NULL.

* If any _read_ operation (fetch, foreach) is called with a
  blank transaction then current_txn MUST be NULL, however if
  it is called with a NULL transaction pointer, then the
  read operation will happen as if it had been called with
  current_txn instead (i.e. no read lock or unlock, as there
  is already a write lock active)

* If any operation creates a new transaction, it also sets
  current_txn on the database object.

* Any time commit or abort is called, it clears current_txn
  after it has succeeded.

Apart from the bugs caused by out other patches, this has 
already found a bug in seen_merge as used by sync_server. 
It is fixed in a separate patch after this one in the series.
Index: cyrus-imapd-2.3.12rc1/lib/cyrusdb_skiplist.c
===================================================================
--- cyrus-imapd-2.3.12rc1.orig/lib/cyrusdb_skiplist.c	2008-03-24 13:43:08.000000000 -0400
+++ cyrus-imapd-2.3.12rc1/lib/cyrusdb_skiplist.c	2008-04-09 07:27:37.000000000 -0400
@@ -145,6 +145,16 @@ enum {
     WRITELOCKED = 2,
 };
 
+struct txn {
+    int ismalloc;
+    int syncfd;
+
+    /* logstart is where we start changes from on commit, where we truncate
+       to on abort */
+    int logstart;
+    int logend;			/* where to write to continue this txn */
+};
+
 struct db {
     /* file data */
     char *fname;
@@ -167,22 +177,20 @@ struct db {
     /* tracking info */
     int lock_status;
     int is_open;
+    struct txn *current_txn;
 
     /* comparator function to use for sorting */
     int (*compar) (const char *s1, int l1, const char *s2, int l2);
 };
 
-struct txn {
-    int ismalloc;
-    int syncfd;
-
-    /* logstart is where we start changes from on commit, where we truncate
-       to on abort */
-    unsigned logstart;
-    unsigned logend;		/* where to write to continue this txn */
+struct db_list {
+    struct db *db;
+    struct db_list *next;
+    int refcount;
 };
 
 static time_t global_recovery = 0;
+static struct db_list *open_db = NULL;
 
 /* Perform an FSYNC/FDATASYNC if we are *not* operating in UNSAFE mode */
 #define DO_FSYNC (!libcyrus_config_getswitch(CYRUSOPT_SKIPLIST_UNSAFE))
@@ -259,6 +267,8 @@ static int myinit(const char *dbdir, int
 
     srand(time(NULL) * getpid());
 
+    open_db = NULL;
+
     return 0;
 }
 
@@ -731,10 +741,24 @@ static int dispose_db(struct db *db)
 
 static int myopen(const char *fname, int flags, struct db **ret)
 {
-    struct db *db = (struct db *) xzmalloc(sizeof(struct db));
+    struct db *db;
+    struct db_list *list_ent = open_db;
     int r;
     int new = 0;
 
+    while (list_ent && strcmp(list_ent->db->fname, fname)) {
+	list_ent = list_ent->next;
+    }
+    if (list_ent) {
+	/* we already have this DB open! */
+	syslog(LOG_NOTICE, "skiplist: %s is already open %d time%s, returning object", 
+	fname, list_ent->refcount, list_ent->refcount == 1 ? "" : "s");
+	*ret = list_ent->db;
+	++list_ent->refcount;
+	return 0;
+    }
+
+    db = (struct db *) xzmalloc(sizeof(struct db));
     db->fd = -1;
     db->fname = xstrdup(fname);
     db->compar = (flags & CYRUSDB_MBOXSORT) ? bsearch_ncompare : compare;
@@ -844,12 +868,36 @@ static int myopen(const char *fname, int
     }
 
     *ret = db;
+
+    /* track this database in the open list */
+    list_ent = (struct db_list *) xzmalloc(sizeof(struct db_list));
+    list_ent->db = db;
+    list_ent->next = open_db;
+    list_ent->refcount = 1;
+    open_db = list_ent;
+
     return 0;
 }
 
 int myclose(struct db *db)
 {
-    return dispose_db(db);
+    struct db_list *list_ent = open_db;
+    struct db_list *prev = NULL;
+
+    /* remove this DB from the open list */
+    while (list_ent && list_ent->db != db) {
+	prev = list_ent;
+	list_ent = list_ent->next;
+    }
+    assert(list_ent);
+    if (--list_ent->refcount <= 0) {
+	if (prev) prev->next = list_ent->next;
+	else open_db = list_ent->next;
+	free(list_ent);
+	return dispose_db(db);
+    }
+
+    return 0;
 }
 
 static int compare(const char *s1, int l1, const char *s2, int l2)
@@ -917,13 +965,18 @@ int myfetch(struct db *db,
     if (datalen) *datalen = 0;
 
     if (!mytid) {
-	/* grab a r lock */
-	if ((r = read_lock(db)) < 0) {
-	    return r;
+	if (db->current_txn == NULL) {
+	    /* grab a r lock */
+	    if ((r = read_lock(db)) < 0) {
+		return r;
+	    }
+	    tp = NULL;
+	} else {
+	    tp = db->current_txn;
+	    update_lock(db, tp);
 	}
-
-	tp = NULL;
     } else if (!*mytid) {
+	assert(db->current_txn == NULL);
 	/* grab a r/w lock */
 	if ((r = write_lock(db, NULL)) < 0) {
 	    return r;
@@ -934,6 +987,7 @@ int myfetch(struct db *db,
 
 	tp = &t;
     } else {
+	assert(db->current_txn == *mytid);
 	tp = *mytid;
 	update_lock(db, tp);
     }
@@ -955,8 +1009,10 @@ int myfetch(struct db *db,
 	    *mytid = xmalloc(sizeof(struct txn));
 	    memcpy(*mytid, tp, sizeof(struct txn));
 	    (*mytid)->ismalloc = 1;
+
+	    db->current_txn = *mytid;
 	}
-    } else {
+    } else if (!tp) {
 	/* release read lock */
 	int r1;
 	if ((r1 = unlock(db)) < 0) {
@@ -1002,13 +1058,18 @@ int myforeach(struct db *db,
     assert(prefixlen >= 0);
 
     if (!tid) {
-	/* grab a r lock */
-	if ((r = read_lock(db)) < 0) {
-	    return r;
+	if (db->current_txn == NULL) {
+	    /* grab a r lock */
+	    if ((r = read_lock(db)) < 0) {
+		return r;
+	    }
+	    tp = NULL;
+	} else {
+	    tp = db->current_txn;
+	    update_lock(db, tp);
 	}
-
-	tp = NULL;
     } else if (!*tid) {
+	assert(db->current_txn == NULL);
 	/* grab a r/w lock */
 	if ((r = write_lock(db, NULL)) < 0) {
 	    return r;
@@ -1019,6 +1080,7 @@ int myforeach(struct db *db,
 
 	tp = &t;
     } else {
+	assert(db->current_txn == *tid);
 	tp = *tid;
 	update_lock(db, tp);
     }
@@ -1035,7 +1097,7 @@ int myforeach(struct db *db,
 	    ino_t ino = db->map_ino;
 	    unsigned long sz = db->map_size;
 
-	    if (!tid) {
+	    if (!tp) {
 		/* release read lock */
 		if ((r = unlock(db)) < 0) {
 		    return r;
@@ -1054,7 +1116,7 @@ int myforeach(struct db *db,
 	    cb_r = cb(rock, KEY(ptr), KEYLEN(ptr), DATA(ptr), DATALEN(ptr));
 	    if (cb_r) break;
 
-	    if (!tid) {
+	    if (!tp) {
 		/* grab a r lock */
 		if ((r = read_lock(db)) < 0) {
 		    return r;
@@ -1096,8 +1158,10 @@ int myforeach(struct db *db,
 	    *tid = xmalloc(sizeof(struct txn));
 	    memcpy(*tid, tp, sizeof(struct txn));
 	    (*tid)->ismalloc = 1;
+
+	    db->current_txn = *tid;
 	}
-    } else {
+    } else if (!tp) {
 	/* release read lock */
 	if ((r = unlock(db)) < 0) {
 	    return r;
@@ -1149,6 +1213,7 @@ int mystore(struct db *db, 
     assert(key && keylen);
 
     if (!tid || !*tid) {
+	assert(db->current_txn == NULL);
 	/* grab a r/w lock */
 	if ((r = write_lock(db, NULL)) < 0) {
 	    return r;
@@ -1158,7 +1223,10 @@ int mystore(struct db *db, 
 	if ((r = newtxn(db, &t))) return r;
 
 	tp = &t;
+
+	db->current_txn = tp;
     } else {
+	assert(db->current_txn == *tid);
 	tp = *tid;
 	update_lock(db, tp);
     }
@@ -1267,6 +1335,8 @@ int mystore(struct db *db, 
 	    *tid = xmalloc(sizeof(struct txn));
 	    memcpy(*tid, tp, sizeof(struct txn));
 	    (*tid)->ismalloc = 1;
+
+	    db->current_txn = *tid;
 	}
 
 	if (be_paranoid) {
@@ -1310,6 +1380,7 @@ int mydelete(struct db *db, 
     int r;
 
     if (!tid || !*tid) {
+	assert(db->current_txn == NULL);
 	/* grab a r/w lock */
 	if ((r = write_lock(db, NULL)) < 0) {
 	    return r;
@@ -1319,7 +1390,10 @@ int mydelete(struct db *db, 
 	if ((r = newtxn(db, &t))) return r;
 
 	tp = &t;
+
+	db->current_txn = tp;
     } else {
+	assert(db->current_txn == *tid);
 	tp = *tid;
 	update_lock(db, tp);
     }
@@ -1372,6 +1446,8 @@ int mydelete(struct db *db, 
 	    *tid = xmalloc(sizeof(struct txn));
 	    memcpy(*tid, tp, sizeof(struct txn));
 	    (*tid)->ismalloc = 1;
+
+	    db->current_txn = *tid;
 	}
 
 	if (be_paranoid) {
@@ -1392,6 +1468,8 @@ int mycommit(struct db *db, struct txn *
 
     assert(db && tid);
 
+    assert(db->current_txn == tid);
+
     update_lock(db, tid);
 
     if (be_paranoid) {
@@ -1430,13 +1508,16 @@ int mycommit(struct db *db, struct txn *
     }
 
  done:
+    if (!r)
+	db->current_txn = NULL;
+
     /* consider checkpointing */
     if (!r && tid->logend > (2 * db->logstart + SKIPLIST_MINREWRITE)) {
 	r = mycheckpoint(db, 1);
     }
     
     if (be_paranoid) {
-	assert(myconsistent(db, NULL, 1) == 0);
+	assert(myconsistent(db, db->current_txn, 1) == 0);
     }
 
     if (r) {
@@ -1476,6 +1557,8 @@ int myabort(struct db *db, struct txn *t
 
     assert(db && tid);
 
+    assert(db->current_txn == tid);
+
     /* update the mmap so we can see the log entries we need to remove */
     update_lock(db, tid);
     
@@ -1564,6 +1647,8 @@ int myabort(struct db *db, struct txn *t
 	free(tid);
     }
 
+    db->current_txn = NULL;
+
     return 0;
 }
 
@@ -1595,6 +1680,9 @@ static int mycheckpoint(struct db *db, i
 		    db->fname, 0);
     }
 
+    /* can't be in a transaction */
+    assert(db->current_txn == NULL);
+
     if ((r = myconsistent(db, NULL, 1)) < 0) {
 	syslog(LOG_ERR, "db %s, inconsistent pre-checkpoint, bailing out",
 	       db->fname);
@@ -1869,6 +1957,8 @@ static int myconsistent(struct db *db, s
     const char *ptr;
     bit32 offset;
 
+    assert(db->current_txn == tid); /* could both be null */
+
     if (!locked) read_lock(db);
     else if (tid) update_lock(db, tid);
 
@@ -1944,6 +2034,9 @@ static int recovery(struct db *db, int f
 	return 0;
     }
 
+    /* can't run recovery inside a txn */
+    assert(db->current_txn == NULL);
+
     db->listsize = 0;
 
     ptr = DUMMY_PTR(db);

Reply via email to