commit 1ba0cd7b96de08b843c77e4b6385bf6d34bd2afa
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Tue May 31 09:58:41 2022 +0200

    factor out sync_state.c & sync_p.h from sync.c
    
    while moving the code, localize some variables, and use C99 comments.

 src/Makefile.am  |   4 +-
 src/sync.c       | 644 +----------------------------------------------
 src/sync_p.h     |  93 +++++++
 src/sync_state.c | 569 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 665 insertions(+), 645 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index 5e3d5aa2..e06005da 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -7,12 +7,12 @@ mbsync_SOURCES = \
        driver.c drv_proxy.c \
        drv_imap.c \
        drv_maildir.c \
-       sync.c \
+       sync.c sync_state.c \
        main.c
 noinst_HEADERS = \
        common.h config.h socket.h \
        driver.h \
-       sync.h
+       sync.h sync_p.h
 mbsync_LDADD = $(DB_LIBS) $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) $(Z_LIBS) 
$(KEYCHAIN_LIBS)
 
 drv_proxy.$(OBJEXT): drv_proxy.inc
diff --git a/src/sync.c b/src/sync.c
index b4f74af0..2e69b88b 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -5,16 +5,7 @@
  * mbsync - mailbox synchronizer
  */
 
-#define DEBUG_FLAG DEBUG_SYNC
-
-#include "sync.h"
-
-#include <fcntl.h>
-#include <ctype.h>
-#include <errno.h>
-#include <sys/stat.h>
-
-#define JOURNAL_VERSION "4"
+#include "sync_p.h"
 
 channel_conf_t global_conf;
 channel_conf_t *channels;
@@ -26,75 +17,6 @@ int new_total[2], new_done[2];
 int flags_total[2], flags_done[2];
 int trash_total[2], trash_done[2];
 
-const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", 
"pull" };
-
-
-static uchar
-parse_flags( const char *buf )
-{
-       uint i, d;
-       uchar flags;
-
-       for (flags = i = d = 0; i < as(MsgFlags); i++) {
-               if (buf[d] == MsgFlags[i]) {
-                       flags |= (1 << i);
-                       d++;
-               }
-       }
-       return flags;
-}
-
-// This is the (mostly) persistent status of the sync record.
-// Most of these bits are actually mutually exclusive. It is a
-// bitfield to allow for easy testing for multiple states.
-#define S_EXPIRE       (1<<0)  // the entry is being expired (near side 
message removal scheduled)
-#define S_EXPIRED      (1<<1)  // the entry is expired (near side message 
removal confirmed)
-#define S_PENDING      (1<<2)  // the entry is new and awaits propagation 
(possibly a retry)
-#define S_DUMMY(fn)    (1<<(3+(fn)))  // f/n message is only a placeholder
-#define S_SKIPPED      (1<<5)  // pre-1.4 legacy: the entry was not propagated 
(message is too big)
-#define S_DEAD         (1<<7)  // ephemeral: the entry was killed and should 
be ignored
-
-// Ephemeral working set.
-#define W_NEXPIRE      (1<<0)  // temporary: new expiration state
-#define W_DELETE       (1<<1)  // ephemeral: flags propagation is a deletion
-#define W_DEL(fn)      (1<<(2+(fn)))  // ephemeral: f/n message would be 
subject to expunge
-#define W_UPGRADE      (1<<4)  // ephemeral: upgrading placeholder, do not 
apply MaxSize
-#define W_PURGE        (1<<5)  // ephemeral: placeholder is being nuked
-
-typedef struct sync_rec {
-       struct sync_rec *next;
-       /* string_list_t *keywords; */
-       uint uid[2];
-       message_t *msg[2];
-       uchar status, wstate, flags, pflags, aflags[2], dflags[2];
-       char tuid[TUIDL];
-} sync_rec_t;
-
-typedef struct {
-       int t[2];
-       void (*cb)( int sts, void *aux ), *aux;
-       char *dname, *jname, *nname, *lname, *box_name[2];
-       FILE *jfp, *nfp;
-       sync_rec_t *srecs, **srecadd;
-       channel_conf_t *chan;
-       store_t *ctx[2];
-       driver_t *drv[2];
-       const char *orig_name[2];
-       message_t *msgs[2], *new_msgs[2];
-       uint_array_alloc_t trashed_msgs[2];
-       int state[2], lfd, ret, existing, replayed;
-       uint ref_count, nsrecs, opts[2];
-       uint new_pending[2], flags_pending[2], trash_pending[2];
-       uint maxuid[2];     // highest UID that was already propagated
-       uint oldmaxuid[2];  // highest UID that was already propagated before 
this run
-       uint uidval[2];     // UID validity value
-       uint newuidval[2];  // UID validity obtained from driver
-       uint finduid[2];    // TUID lookup makes sense only for UIDs >= this
-       uint maxxfuid;      // highest expired UID on far side
-       uint oldmaxxfuid;   // highest expired UID on far side before this run
-       uchar good_flags[2], bad_flags[2];
-} sync_vars_t;
-
 static void sync_ref( sync_vars_t *svars ) { ++svars->ref_count; }
 static void sync_deref( sync_vars_t *svars );
 static int check_cancel( sync_vars_t *svars );
@@ -139,124 +61,6 @@ static int check_cancel( sync_vars_t *svars );
 #define ST_SENDING_NEW     (1<<15)
 
 
-static void
-create_state( sync_vars_t *svars )
-{
-       if (!(svars->nfp = fopen( svars->nname, "w" ))) {
-               sys_error( "Error: cannot create new sync state %s", 
svars->nname );
-               exit( 1 );
-       }
-}
-
-static void ATTR_PRINTFLIKE(2, 3)
-jFprintf( sync_vars_t *svars, const char *msg, ... )
-{
-       va_list va;
-
-       if (JLimit && !--JLimit)
-               exit( 101 );
-       if (!svars->jfp) {
-               create_state( svars );
-               if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : 
"w" ))) {
-                       sys_error( "Error: cannot create journal %s", 
svars->jname );
-                       exit( 1 );
-               }
-               setlinebuf( svars->jfp );
-               if (!svars->replayed)
-                       Fprintf( svars->jfp, JOURNAL_VERSION "\n" );
-       }
-       va_start( va, msg );
-       vFprintf( svars->jfp, msg, va );
-       va_end( va );
-       if (JLimit && !--JLimit)
-               exit( 100 );
-}
-
-#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \
-       do { \
-               debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \
-               jFprintf( svars, log_fmt "\n", deparen(log_args) ); \
-       } while (0)
-#define JLOG3(log_fmt, log_args, dbg_fmt) \
-       JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args))
-#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \
-       JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args))
-#define JLOG_SEL(_1, _2, _3, _4, x, ...) x
-#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, 
NO_JLOG1)(__VA_ARGS__)
-
-static void
-assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
-{
-       srec->uid[t] = uid;
-       if (uid == svars->maxuid[t] + 1)
-               svars->maxuid[t] = uid;
-       srec->status &= ~S_PENDING;
-       srec->wstate &= ~W_UPGRADE;
-       srec->tuid[0] = 0;
-}
-
-#define ASSIGN_UID(srec, t, nuid, ...) \
-       do { \
-               JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], 
nuid), __VA_ARGS__ ); \
-               assign_uid( svars, srec, t, nuid ); \
-       } while (0)
-
-static void
-assign_tuid( sync_vars_t *svars, sync_rec_t *srec )
-{
-       for (uint i = 0; i < TUIDL; i++) {
-               uchar c = arc4_getbyte() & 0x3f;
-               srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 
:
-                                      c < 62 ? c + '0' - 52 : c == 62 ? '+' : 
'/');
-       }
-       JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], 
srec->tuid), "new TUID" );
-}
-
-static int
-match_tuids( sync_vars_t *svars, int t, message_t *msgs )
-{
-       sync_rec_t *srec;
-       message_t *tmsg, *ntmsg = NULL;
-       const char *diag;
-       int num_lost = 0;
-
-       for (srec = svars->srecs; srec; srec = srec->next) {
-               if (srec->status & S_DEAD)
-                       continue;
-               if (!srec->uid[t] && srec->tuid[0]) {
-                       debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", 
srec->uid[F], srec->uid[N], srec->tuid );
-                       for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) {
-                               if (tmsg->status & M_DEAD)
-                                       continue;
-                               if (tmsg->tuid[0] && !memcmp( tmsg->tuid, 
srec->tuid, TUIDL )) {
-                                       diag = (tmsg == ntmsg) ? "adjacently" : 
"after gap";
-                                       goto mfound;
-                               }
-                       }
-                       for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) {
-                               if (tmsg->status & M_DEAD)
-                                       continue;
-                               if (tmsg->tuid[0] && !memcmp( tmsg->tuid, 
srec->tuid, TUIDL )) {
-                                       diag = "after reset";
-                                       goto mfound;
-                               }
-                       }
-                       JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID 
lost" );
-                       // Note: status remains S_PENDING.
-                       srec->tuid[0] = 0;
-                       num_lost++;
-                       continue;
-                 mfound:
-                       tmsg->srec = srec;
-                       srec->msg[t] = tmsg;
-                       ntmsg = tmsg->next;
-                       ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag 
);
-               }
-       }
-       return num_lost;
-}
-
-
 static uchar
 sanitize_flags( uchar tflags, sync_vars_t *svars, int t )
 {
@@ -627,452 +431,6 @@ check_ret( int sts, void *aux )
        } \
        INIT_SVARS(vars->aux)
 
-static char *
-clean_strdup( const char *s )
-{
-       char *cs;
-       uint i;
-
-       cs = nfstrdup( s );
-       for (i = 0; cs[i]; i++)
-               if (cs[i] == '/')
-                       cs[i] = '!';
-       return cs;
-}
-
-
-static sync_rec_t *
-upgrade_srec( sync_vars_t *svars, sync_rec_t *srec )
-{
-       // Create an entry and append it to the current one.
-       sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) );
-       nsrec->next = srec->next;
-       srec->next = nsrec;
-       if (svars->srecadd == &srec->next)
-               svars->srecadd = &nsrec->next;
-       // Move the placeholder to the new entry.
-       int t = (srec->status & S_DUMMY(F)) ? F : N;
-       nsrec->uid[t] = srec->uid[t];
-       srec->uid[t] = 0;
-       if (srec->msg[t]) {  // NULL during journal replay; is assigned later.
-               nsrec->msg[t] = srec->msg[t];
-               nsrec->msg[t]->srec = nsrec;
-               srec->msg[t] = NULL;
-       }
-       // Mark the original entry for upgrade.
-       srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING;
-       srec->wstate |= W_UPGRADE;
-       // Mark the placeholder for nuking.
-       nsrec->wstate = W_PURGE;
-       nsrec->aflags[t] = F_DELETED;
-       return nsrec;
-}
-
-static int
-prepare_state( sync_vars_t *svars )
-{
-       char *s, *cmname, *csname;
-       channel_conf_t *chan;
-
-       chan = svars->chan;
-       if (!strcmp( chan->sync_state ? chan->sync_state : 
global_conf.sync_state, "*" )) {
-               const char *path = svars->drv[N]->get_box_path( svars->ctx[N] );
-               if (!path) {
-                       error( "Error: store '%s' does not support in-box sync 
state\n", chan->stores[N]->name );
-                       return 0;
-               }
-               nfasprintf( &svars->dname, "%s/." EXE "state", path );
-       } else {
-               csname = clean_strdup( svars->box_name[N] );
-               if (chan->sync_state) {
-                       nfasprintf( &svars->dname, "%s%s", chan->sync_state, 
csname );
-               } else {
-                       char c = FieldDelimiter;
-                       cmname = clean_strdup( svars->box_name[F] );
-                       nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", 
global_conf.sync_state,
-                                   c, chan->stores[F]->name, c, cmname, c, 
chan->stores[N]->name, c, csname );
-                       free( cmname );
-               }
-               free( csname );
-               if (!(s = strrchr( svars->dname, '/' ))) {
-                       error( "Error: invalid SyncState location '%s'\n", 
svars->dname );
-                       return 0;
-               }
-               *s = 0;
-               if (mkdir( svars->dname, 0700 ) && errno != EEXIST) {
-                       sys_error( "Error: cannot create SyncState directory 
'%s'", svars->dname );
-                       return 0;
-               }
-               *s = '/';
-       }
-       nfasprintf( &svars->jname, "%s.journal", svars->dname );
-       nfasprintf( &svars->nname, "%s.new", svars->dname );
-       nfasprintf( &svars->lname, "%s.lock", svars->dname );
-       return 1;
-}
-
-static int
-lock_state( sync_vars_t *svars )
-{
-       struct flock lck;
-
-       if (svars->lfd >= 0)
-               return 1;
-       memset( &lck, 0, sizeof(lck) );
-#if SEEK_SET != 0
-       lck.l_whence = SEEK_SET;
-#endif
-#if F_WRLCK != 0
-       lck.l_type = F_WRLCK;
-#endif
-       if ((svars->lfd = open( svars->lname, O_WRONLY|O_CREAT, 0666 )) < 0) {
-               sys_error( "Error: cannot create lock file %s", svars->lname );
-               return 0;
-       }
-       if (fcntl( svars->lfd, F_SETLK, &lck )) {
-               error( "Error: channel :%s:%s-:%s:%s is locked\n",
-                      svars->chan->stores[F]->name, svars->orig_name[F], 
svars->chan->stores[N]->name, svars->orig_name[N] );
-               close( svars->lfd );
-               svars->lfd = -1;
-               return 0;
-       }
-       return 1;
-}
-
-static void
-save_state( sync_vars_t *svars )
-{
-       sync_rec_t *srec;
-       char fbuf[16]; /* enlarge when support for keywords is added */
-
-       // If no change was made, the state is also unmodified.
-       if (!svars->jfp && !svars->replayed)
-               return;
-
-       if (!svars->nfp)
-               create_state( svars );
-       Fprintf( svars->nfp,
-                "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid 
%u\nMaxPushedUid %u\n",
-                svars->uidval[F], svars->uidval[N], svars->maxuid[F], 
svars->maxuid[N] );
-       if (svars->maxxfuid)
-               Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid );
-       Fprintf( svars->nfp, "\n" );
-       for (srec = svars->srecs; srec; srec = srec->next) {
-               if (srec->status & S_DEAD)
-                       continue;
-               make_flags( srec->flags, fbuf );
-               Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], 
srec->uid[N],
-                        (srec->status & S_DUMMY(F)) ? "<" : (srec->status & 
S_DUMMY(N)) ? ">" : "",
-                        (srec->status & S_SKIPPED) ? "^" : (srec->status & 
S_EXPIRED) ? "~" : "", fbuf );
-       }
-
-       Fclose( svars->nfp, 1 );
-       if (svars->jfp)
-               Fclose( svars->jfp, 0 );
-       if (!(DFlags & KEEPJOURNAL)) {
-               /* order is important! */
-               if (rename( svars->nname, svars->dname ))
-                       warn( "Warning: cannot commit sync state %s\n", 
svars->dname );
-               else if (unlink( svars->jname ))
-                       warn( "Warning: cannot delete journal %s\n", 
svars->jname );
-       }
-}
-
-static int
-load_state( sync_vars_t *svars )
-{
-       sync_rec_t *srec, *nsrec;
-       char *s;
-       FILE *jfp;
-       uint ll;
-       uint maxxnuid = 0;
-       char c;
-       struct stat st;
-       char fbuf[16]; /* enlarge when support for keywords is added */
-       char buf[128], buf1[64], buf2[64];
-
-       if ((jfp = fopen( svars->dname, "r" ))) {
-               if (!lock_state( svars ))
-                       goto jbail;
-               debug( "reading sync state %s ...\n", svars->dname );
-               int line = 0;
-               while (fgets( buf, sizeof(buf), jfp )) {
-                       line++;
-                       if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') {
-                               error( "Error: incomplete sync state header 
entry at %s:%d\n", svars->dname, line );
-                         jbail:
-                               fclose( jfp );
-                               return 0;
-                       }
-                       if (ll == 1)
-                               goto gothdr;
-                       if (line == 1 && isdigit( buf[0] )) {  // Pre-1.1 legacy
-                               if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 
||
-                                   sscanf( buf1, "%u:%u", &svars->uidval[F], 
&svars->maxuid[F] ) < 2 ||
-                                   sscanf( buf2, "%u:%u:%u", 
&svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) {
-                                       error( "Error: invalid sync state 
header in %s\n", svars->dname );
-                                       goto jbail;
-                               }
-                               goto gothdr;
-                       }
-                       uint uid;
-                       if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) {
-                               error( "Error: malformed sync state header 
entry at %s:%d\n", svars->dname, line );
-                               goto jbail;
-                       }
-                       if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, 
"MasterUidValidity" ) /* Pre-1.4 legacy */) {
-                               svars->uidval[F] = uid;
-                       } else if (!strcmp( buf1, "NearUidValidity" ) || 
!strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) {
-                               svars->uidval[N] = uid;
-                       } else if (!strcmp( buf1, "MaxPulledUid" )) {
-                               svars->maxuid[F] = uid;
-                       } else if (!strcmp( buf1, "MaxPushedUid" )) {
-                               svars->maxuid[N] = uid;
-                       } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || 
!strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) {
-                               svars->maxxfuid = uid;
-                       } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) {  // 
Pre-1.3 legacy
-                               maxxnuid = uid;
-                       } else {
-                               error( "Error: unrecognized sync state header 
entry at %s:%d\n", svars->dname, line );
-                               goto jbail;
-                       }
-               }
-               error( "Error: unterminated sync state header in %s\n", 
svars->dname );
-               goto jbail;
-         gothdr:
-               while (fgets( buf, sizeof(buf), jfp )) {
-                       line++;
-                       if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
-                               error( "Error: incomplete sync state entry at 
%s:%d\n", svars->dname, line );
-                               goto jbail;
-                       }
-                       buf[ll] = 0;
-                       fbuf[0] = 0;
-                       uint t1, t2;
-                       if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) {
-                               error( "Error: invalid sync state entry at 
%s:%d\n", svars->dname, line );
-                               goto jbail;
-                       }
-                       srec = nfzalloc( sizeof(*srec) );
-                       srec->uid[F] = t1;
-                       srec->uid[N] = t2;
-                       s = fbuf;
-                       if (*s == '<') {
-                               s++;
-                               srec->status = S_DUMMY(F);
-                       } else if (*s == '>') {
-                               s++;
-                               srec->status = S_DUMMY(N);
-                       }
-                       if (*s == '^') {  // Pre-1.4 legacy
-                               s++;
-                               srec->status = S_SKIPPED;
-                       } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) 
{
-                               s++;
-                               srec->status = S_EXPIRE | S_EXPIRED;
-                       } else if (srec->uid[F] == (uint)-1) {  // Pre-1.3 
legacy
-                               srec->uid[F] = 0;
-                               srec->status = S_SKIPPED;
-                       } else if (srec->uid[N] == (uint)-1) {
-                               srec->uid[N] = 0;
-                               srec->status = S_SKIPPED;
-                       }
-                       srec->flags = parse_flags( s );
-                       debug( "  entry (%u,%u,%u,%s%s)\n", srec->uid[F], 
srec->uid[N], srec->flags,
-                              (srec->status & S_SKIPPED) ? "SKIP" : 
(srec->status & S_EXPIRED) ? "XPIRE" : "",
-                              (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : 
(srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" );
-                       *svars->srecadd = srec;
-                       svars->srecadd = &srec->next;
-                       svars->nsrecs++;
-               }
-               fclose( jfp );
-               svars->existing = 1;
-       } else {
-               if (errno != ENOENT) {
-                       sys_error( "Error: cannot read sync state %s", 
svars->dname );
-                       return 0;
-               }
-               svars->existing = 0;
-       }
-
-       // This is legacy support for pre-1.3 sync states.
-       if (maxxnuid) {
-               uint minwuid = UINT_MAX;
-               for (srec = svars->srecs; srec; srec = srec->next) {
-                       if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) 
|| !srec->uid[F])
-                               continue;
-                       if (srec->status & S_EXPIRED) {
-                               if (!srec->uid[N]) {
-                                       // The expired message was already gone.
-                                       continue;
-                               }
-                               // The expired message was not expunged yet, so 
re-examine it.
-                               // This will happen en masse, so just extend 
the bulk fetch.
-                       } else {
-                               if (srec->uid[N] && maxxnuid >= srec->uid[N]) {
-                                       // The non-expired message is in the 
generally expired range,
-                                       // so don't make it contribute to the 
bulk fetch.
-                                       continue;
-                               }
-                               // Usual non-expired message.
-                       }
-                       if (minwuid > srec->uid[F])
-                               minwuid = srec->uid[F];
-               }
-               svars->maxxfuid = minwuid - 1;
-       }
-
-       int line = 0;
-       if ((jfp = fopen( svars->jname, "r" ))) {
-               if (!lock_state( svars ))
-                       goto jbail;
-               if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp 
)) {
-                       debug( "recovering journal ...\n" );
-                       if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
-                               error( "Error: incomplete journal header in 
%s\n", svars->jname );
-                               goto jbail;
-                       }
-                       buf[ll] = 0;
-                       if (!equals( buf, (int)ll, JOURNAL_VERSION, 
strlen(JOURNAL_VERSION) )) {
-                               error( "Error: incompatible journal version "
-                                                "(got %s, expected " 
JOURNAL_VERSION ")\n", buf );
-                               goto jbail;
-                       }
-                       srec = NULL;
-                       line = 1;
-                       while (fgets( buf, sizeof(buf), jfp )) {
-                               line++;
-                               if (!(ll = strlen( buf )) || buf[--ll] != '\n') 
{
-                                       error( "Error: incomplete journal entry 
at %s:%d\n", svars->jname, line );
-                                       goto jbail;
-                               }
-                               buf[ll] = 0;
-                               int tn;
-                               uint t1, t2, t3, t4;
-                               if ((c = buf[0]) == '#' ?
-                                     (tn = 0, (sscanf( buf + 2, "%u %u %n", 
&t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
-                                     c == '!' ?
-                                       (sscanf( buf + 2, "%u", &t1 ) != 1) :
-                                       c == 'N' || c == 'F' || c == 'T' || c 
== '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ?
-                                         (sscanf( buf + 2, "%u %u", &t1, &t2 ) 
!= 2) :
-                                         c != '^' ?
-                                           (sscanf( buf + 2, "%u %u %u", &t1, 
&t2, &t3 ) != 3) :
-                                           (sscanf( buf + 2, "%u %u %u %u", 
&t1, &t2, &t3, &t4 ) != 4))
-                               {
-                                       error( "Error: malformed journal entry 
at %s:%d\n", svars->jname, line );
-                                       goto jbail;
-                               }
-                               if (c == 'N') {
-                                       svars->maxuid[t1] = t2;
-                               } else if (c == 'F') {
-                                       svars->finduid[t1] = t2;
-                               } else if (c == 'T') {
-                                       *uint_array_append( 
&svars->trashed_msgs[t1] ) = t2;
-                               } else if (c == '!') {
-                                       svars->maxxfuid = t1;
-                               } else if (c == '|') {
-                                       svars->uidval[F] = t1;
-                                       svars->uidval[N] = t2;
-                               } else if (c == '+') {
-                                       srec = nfzalloc( sizeof(*srec) );
-                                       srec->uid[F] = t1;
-                                       srec->uid[N] = t2;
-                                       debug( "  new entry(%u,%u)\n", t1, t2 );
-                                       srec->status = S_PENDING;
-                                       *svars->srecadd = srec;
-                                       svars->srecadd = &srec->next;
-                                       svars->nsrecs++;
-                               } else {
-                                       for (nsrec = srec; srec; srec = 
srec->next)
-                                               if (srec->uid[F] == t1 && 
srec->uid[N] == t2)
-                                                       goto syncfnd;
-                                       for (srec = svars->srecs; srec != 
nsrec; srec = srec->next)
-                                               if (srec->uid[F] == t1 && 
srec->uid[N] == t2)
-                                                       goto syncfnd;
-                                       error( "Error: journal entry at %s:%d 
refers to non-existing sync state entry\n", svars->jname, line );
-                                       goto jbail;
-                                 syncfnd:
-                                       debugn( "  entry(%u,%u,%u) ", 
srec->uid[F], srec->uid[N], srec->flags );
-                                       switch (c) {
-                                       case '-':
-                                               debug( "killed\n" );
-                                               srec->status = S_DEAD;
-                                               break;
-                                       case '=':
-                                               debug( "aborted\n" );
-                                               if (svars->maxxfuid < 
srec->uid[F])
-                                                       svars->maxxfuid = 
srec->uid[F];
-                                               srec->status = S_DEAD;
-                                               break;
-                                       case '#':
-                                               memcpy( srec->tuid, buf + tn + 
2, TUIDL );
-                                               debug( "TUID now %." 
stringify(TUIDL) "s\n", srec->tuid );
-                                               break;
-                                       case '&':
-                                               debug( "TUID %." 
stringify(TUIDL) "s lost\n", srec->tuid );
-                                               srec->tuid[0] = 0;
-                                               break;
-                                       case '<':
-                                               debug( "far side now %u\n", t3 
);
-                                               assign_uid( svars, srec, F, t3 
);
-                                               break;
-                                       case '>':
-                                               debug( "near side now %u\n", t3 
);
-                                               assign_uid( svars, srec, N, t3 
);
-                                               break;
-                                       case '*':
-                                               debug( "flags now %u\n", t3 );
-                                               srec->flags = (uchar)t3;
-                                               srec->aflags[F] = 
srec->aflags[N] = 0;  // Clear F_DELETED from purge
-                                               srec->wstate &= ~W_PURGE;
-                                               break;
-                                       case '~':
-                                               debug( "status now %#x\n", t3 );
-                                               srec->status = (uchar)t3;
-                                               break;
-                                       case '_':
-                                               debug( "has placeholder now\n" 
);
-                                               srec->status = S_PENDING;  // 
Pre-1.4 legacy only
-                                               srec->status |= !srec->uid[F] ? 
S_DUMMY(F) : S_DUMMY(N);
-                                               break;
-                                       case '^':
-                                               debug( "is being upgraded, 
flags %u, srec flags %u\n", t3, t4 );
-                                               srec->pflags = (uchar)t3;
-                                               srec->flags = (uchar)t4;
-                                               srec = upgrade_srec( svars, 
srec );
-                                               break;
-                                       default:
-                                               error( "Error: unrecognized 
journal entry at %s:%d\n", svars->jname, line );
-                                               goto jbail;
-                                       }
-                               }
-                       }
-               }
-               fclose( jfp );
-               sort_uint_array( svars->trashed_msgs[F].array );
-               sort_uint_array( svars->trashed_msgs[N].array );
-       } else {
-               if (errno != ENOENT) {
-                       sys_error( "Error: cannot read journal %s", 
svars->jname );
-                       return 0;
-               }
-       }
-       svars->replayed = line;
-
-       return 1;
-}
-
-static void
-delete_state( sync_vars_t *svars )
-{
-       unlink( svars->nname );
-       unlink( svars->jname );
-       if (unlink( svars->dname ) || unlink( svars->lname )) {
-               sys_error( "Error: channel %s: sync state cannot be deleted", 
svars->chan->name );
-               svars->ret = SYNC_FAIL;
-       }
-}
-
 static void box_confirmed( int sts, uint uidvalidity, void *aux );
 static void box_confirmed2( sync_vars_t *svars, int t );
 static void box_deleted( int sts, void *aux );
diff --git a/src/sync_p.h b/src/sync_p.h
new file mode 100644
index 00000000..07eefcbf
--- /dev/null
+++ b/src/sync_p.h
@@ -0,0 +1,93 @@
+// SPDX-FileCopyrightText: 2002-2022 Oswald Buddenhagen <o...@users.sf.net>
+// SPDX-License-Identifier: GPL-2.0-or-later WITH 
LicenseRef-isync-GPL-exception
+//
+// mbsync - mailbox synchronizer
+//
+
+#define DEBUG_FLAG DEBUG_SYNC
+
+#include "sync.h"
+
+// This is the (mostly) persistent status of the sync record.
+// Most of these bits are actually mutually exclusive. It is a
+// bitfield to allow for easy testing for multiple states.
+#define S_EXPIRE       (1<<0)  // the entry is being expired (near side 
message removal scheduled)
+#define S_EXPIRED      (1<<1)  // the entry is expired (near side message 
removal confirmed)
+#define S_PENDING      (1<<2)  // the entry is new and awaits propagation 
(possibly a retry)
+#define S_DUMMY(fn)    (1<<(3+(fn)))  // f/n message is only a placeholder
+#define S_SKIPPED      (1<<5)  // pre-1.4 legacy: the entry was not propagated 
(message is too big)
+#define S_DEAD         (1<<7)  // ephemeral: the entry was killed and should 
be ignored
+
+// Ephemeral working set.
+#define W_NEXPIRE      (1<<0)  // temporary: new expiration state
+#define W_DELETE       (1<<1)  // ephemeral: flags propagation is a deletion
+#define W_DEL(fn)      (1<<(2+(fn)))  // ephemeral: f/n message would be 
subject to expunge
+#define W_UPGRADE      (1<<4)  // ephemeral: upgrading placeholder, do not 
apply MaxSize
+#define W_PURGE        (1<<5)  // ephemeral: placeholder is being nuked
+
+typedef struct sync_rec {
+       struct sync_rec *next;
+       /* string_list_t *keywords; */
+       uint uid[2];
+       message_t *msg[2];
+       uchar status, wstate, flags, pflags, aflags[2], dflags[2];
+       char tuid[TUIDL];
+} sync_rec_t;
+
+typedef struct {
+       int t[2];
+       void (*cb)( int sts, void *aux ), *aux;
+       char *dname, *jname, *nname, *lname, *box_name[2];
+       FILE *jfp, *nfp;
+       sync_rec_t *srecs, **srecadd;
+       channel_conf_t *chan;
+       store_t *ctx[2];
+       driver_t *drv[2];
+       const char *orig_name[2];
+       message_t *msgs[2], *new_msgs[2];
+       uint_array_alloc_t trashed_msgs[2];
+       int state[2], lfd, ret, existing, replayed;
+       uint ref_count, nsrecs, opts[2];
+       uint new_pending[2], flags_pending[2], trash_pending[2];
+       uint maxuid[2];     // highest UID that was already propagated
+       uint oldmaxuid[2];  // highest UID that was already propagated before 
this run
+       uint uidval[2];     // UID validity value
+       uint newuidval[2];  // UID validity obtained from driver
+       uint finduid[2];    // TUID lookup makes sense only for UIDs >= this
+       uint maxxfuid;      // highest expired UID on far side
+       uint oldmaxxfuid;   // highest expired UID on far side before this run
+       uchar good_flags[2], bad_flags[2];
+} sync_vars_t;
+
+int prepare_state( sync_vars_t *svars );
+int lock_state( sync_vars_t *svars );
+int load_state( sync_vars_t *svars );
+void save_state( sync_vars_t *svars );
+void delete_state( sync_vars_t *svars );
+
+void ATTR_PRINTFLIKE(2, 3) jFprintf( sync_vars_t *svars, const char *msg, ... 
);
+
+#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \
+       do { \
+               debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \
+               jFprintf( svars, log_fmt "\n", deparen(log_args) ); \
+       } while (0)
+#define JLOG3(log_fmt, log_args, dbg_fmt) \
+       JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args))
+#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \
+       JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args))
+#define JLOG_SEL(_1, _2, _3, _4, x, ...) x
+#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, 
NO_JLOG1)(__VA_ARGS__)
+
+void assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid );
+
+#define ASSIGN_UID(srec, t, nuid, ...) \
+       do { \
+               JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], 
nuid), __VA_ARGS__ ); \
+               assign_uid( svars, srec, t, nuid ); \
+       } while (0)
+
+void assign_tuid( sync_vars_t *svars, sync_rec_t *srec );
+int match_tuids( sync_vars_t *svars, int t, message_t *msgs );
+
+sync_rec_t *upgrade_srec( sync_vars_t *svars, sync_rec_t *srec );
diff --git a/src/sync_state.c b/src/sync_state.c
new file mode 100644
index 00000000..c6761e97
--- /dev/null
+++ b/src/sync_state.c
@@ -0,0 +1,569 @@
+// SPDX-FileCopyrightText: 2004-2022 Oswald Buddenhagen <o...@users.sf.net>
+// SPDX-License-Identifier: GPL-2.0-or-later WITH 
LicenseRef-isync-GPL-exception
+//
+// mbsync - mailbox synchronizer
+//
+
+#define DEBUG_FLAG DEBUG_SYNC
+
+#include "sync_p.h"
+
+#include <fcntl.h>
+#include <ctype.h>
+#include <errno.h>
+#include <sys/stat.h>
+
+#define JOURNAL_VERSION "4"
+
+const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", 
"pull" };
+
+static char *
+clean_strdup( const char *s )
+{
+       char *cs = nfstrdup( s );
+       for (uint i = 0; cs[i]; i++)
+               if (cs[i] == '/')
+                       cs[i] = '!';
+       return cs;
+}
+
+int
+prepare_state( sync_vars_t *svars )
+{
+       channel_conf_t *chan = svars->chan;
+       if (!strcmp( chan->sync_state ? chan->sync_state : 
global_conf.sync_state, "*" )) {
+               const char *path = svars->drv[N]->get_box_path( svars->ctx[N] );
+               if (!path) {
+                       error( "Error: store '%s' does not support in-box sync 
state\n", chan->stores[N]->name );
+                       return 0;
+               }
+               nfasprintf( &svars->dname, "%s/." EXE "state", path );
+       } else {
+               char *cnname = clean_strdup( svars->box_name[N] );
+               if (chan->sync_state) {
+                       nfasprintf( &svars->dname, "%s%s", chan->sync_state, 
cnname );
+               } else {
+                       char c = FieldDelimiter;
+                       char *cfname = clean_strdup( svars->box_name[F] );
+                       nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", 
global_conf.sync_state,
+                                   c, chan->stores[F]->name, c, cfname, c, 
chan->stores[N]->name, c, cnname );
+                       free( cfname );
+               }
+               free( cnname );
+               char *s;
+               if (!(s = strrchr( svars->dname, '/' ))) {
+                       error( "Error: invalid SyncState location '%s'\n", 
svars->dname );
+                       return 0;
+               }
+               // Note that this may be shorter than the configuration value,
+               // as that may contain a filename prefix.
+               *s = 0;
+               if (mkdir( svars->dname, 0700 ) && errno != EEXIST) {
+                       sys_error( "Error: cannot create SyncState directory 
'%s'", svars->dname );
+                       return 0;
+               }
+               *s = '/';
+       }
+       nfasprintf( &svars->jname, "%s.journal", svars->dname );
+       nfasprintf( &svars->nname, "%s.new", svars->dname );
+       nfasprintf( &svars->lname, "%s.lock", svars->dname );
+       return 1;
+}
+
+int
+lock_state( sync_vars_t *svars )
+{
+       struct flock lck;
+
+       if (svars->lfd >= 0)
+               return 1;
+       memset( &lck, 0, sizeof(lck) );
+#if SEEK_SET != 0
+       lck.l_whence = SEEK_SET;
+#endif
+#if F_WRLCK != 0
+       lck.l_type = F_WRLCK;
+#endif
+       if ((svars->lfd = open( svars->lname, O_WRONLY | O_CREAT, 0666 )) < 0) {
+               sys_error( "Error: cannot create lock file %s", svars->lname );
+               return 0;
+       }
+       if (fcntl( svars->lfd, F_SETLK, &lck )) {
+               error( "Error: channel :%s:%s-:%s:%s is locked\n",
+                      svars->chan->stores[F]->name, svars->orig_name[F], 
svars->chan->stores[N]->name, svars->orig_name[N] );
+               close( svars->lfd );
+               svars->lfd = -1;
+               return 0;
+       }
+       return 1;
+}
+
+static uchar
+parse_flags( const char *buf )
+{
+       uchar flags = 0;
+       for (uint i = 0, d = 0; i < as(MsgFlags); i++) {
+               if (buf[d] == MsgFlags[i]) {
+                       flags |= (1 << i);
+                       d++;
+               }
+       }
+       return flags;
+}
+
+int
+load_state( sync_vars_t *svars )
+{
+       sync_rec_t *srec, *nsrec;
+       FILE *jfp;
+       uint ll;
+       uint maxxnuid = 0;
+       char fbuf[16];  // enlarge when support for keywords is added
+       char buf[128], buf1[64], buf2[64];
+
+       if ((jfp = fopen( svars->dname, "r" ))) {
+               if (!lock_state( svars ))
+                       goto jbail;
+               debug( "reading sync state %s ...\n", svars->dname );
+               int line = 0;
+               while (fgets( buf, sizeof(buf), jfp )) {
+                       line++;
+                       if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') {
+                               error( "Error: incomplete sync state header 
entry at %s:%d\n", svars->dname, line );
+                         jbail:
+                               fclose( jfp );
+                               return 0;
+                       }
+                       if (ll == 1)
+                               goto gothdr;
+                       if (line == 1 && isdigit( buf[0] )) {  // Pre-1.1 legacy
+                               if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 
||
+                                   sscanf( buf1, "%u:%u", &svars->uidval[F], 
&svars->maxuid[F] ) < 2 ||
+                                   sscanf( buf2, "%u:%u:%u", 
&svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) {
+                                       error( "Error: invalid sync state 
header in %s\n", svars->dname );
+                                       goto jbail;
+                               }
+                               goto gothdr;
+                       }
+                       uint uid;
+                       if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) {
+                               error( "Error: malformed sync state header 
entry at %s:%d\n", svars->dname, line );
+                               goto jbail;
+                       }
+                       if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, 
"MasterUidValidity" ) /* Pre-1.4 legacy */) {
+                               svars->uidval[F] = uid;
+                       } else if (!strcmp( buf1, "NearUidValidity" ) || 
!strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) {
+                               svars->uidval[N] = uid;
+                       } else if (!strcmp( buf1, "MaxPulledUid" )) {
+                               svars->maxuid[F] = uid;
+                       } else if (!strcmp( buf1, "MaxPushedUid" )) {
+                               svars->maxuid[N] = uid;
+                       } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || 
!strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) {
+                               svars->maxxfuid = uid;
+                       } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) {  // 
Pre-1.3 legacy
+                               maxxnuid = uid;
+                       } else {
+                               error( "Error: unrecognized sync state header 
entry at %s:%d\n", svars->dname, line );
+                               goto jbail;
+                       }
+               }
+               error( "Error: unterminated sync state header in %s\n", 
svars->dname );
+               goto jbail;
+         gothdr:
+               while (fgets( buf, sizeof(buf), jfp )) {
+                       line++;
+                       if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
+                               error( "Error: incomplete sync state entry at 
%s:%d\n", svars->dname, line );
+                               goto jbail;
+                       }
+                       buf[ll] = 0;
+                       fbuf[0] = 0;
+                       uint t1, t2;
+                       if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) {
+                               error( "Error: invalid sync state entry at 
%s:%d\n", svars->dname, line );
+                               goto jbail;
+                       }
+                       srec = nfzalloc( sizeof(*srec) );
+                       srec->uid[F] = t1;
+                       srec->uid[N] = t2;
+                       char *s = fbuf;
+                       if (*s == '<') {
+                               s++;
+                               srec->status = S_DUMMY(F);
+                       } else if (*s == '>') {
+                               s++;
+                               srec->status = S_DUMMY(N);
+                       }
+                       if (*s == '^') {  // Pre-1.4 legacy
+                               s++;
+                               srec->status = S_SKIPPED;
+                       } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) 
{
+                               s++;
+                               srec->status = S_EXPIRE | S_EXPIRED;
+                       } else if (srec->uid[F] == (uint)-1) {  // Pre-1.3 
legacy
+                               srec->uid[F] = 0;
+                               srec->status = S_SKIPPED;
+                       } else if (srec->uid[N] == (uint)-1) {
+                               srec->uid[N] = 0;
+                               srec->status = S_SKIPPED;
+                       }
+                       srec->flags = parse_flags( s );
+                       debug( "  entry (%u,%u,%u,%s%s)\n", srec->uid[F], 
srec->uid[N], srec->flags,
+                              (srec->status & S_SKIPPED) ? "SKIP" : 
(srec->status & S_EXPIRED) ? "XPIRE" : "",
+                              (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : 
(srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" );
+                       *svars->srecadd = srec;
+                       svars->srecadd = &srec->next;
+                       svars->nsrecs++;
+               }
+               fclose( jfp );
+               svars->existing = 1;
+       } else {
+               if (errno != ENOENT) {
+                       sys_error( "Error: cannot read sync state %s", 
svars->dname );
+                       return 0;
+               }
+               svars->existing = 0;
+       }
+
+       // This is legacy support for pre-1.3 sync states.
+       if (maxxnuid) {
+               uint minwuid = UINT_MAX;
+               for (srec = svars->srecs; srec; srec = srec->next) {
+                       if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) 
|| !srec->uid[F])
+                               continue;
+                       if (srec->status & S_EXPIRED) {
+                               if (!srec->uid[N]) {
+                                       // The expired message was already gone.
+                                       continue;
+                               }
+                               // The expired message was not expunged yet, so 
re-examine it.
+                               // This will happen en masse, so just extend 
the bulk fetch.
+                       } else {
+                               if (srec->uid[N] && maxxnuid >= srec->uid[N]) {
+                                       // The non-expired message is in the 
generally expired range,
+                                       // so don't make it contribute to the 
bulk fetch.
+                                       continue;
+                               }
+                               // Usual non-expired message.
+                       }
+                       if (minwuid > srec->uid[F])
+                               minwuid = srec->uid[F];
+               }
+               svars->maxxfuid = minwuid - 1;
+       }
+
+       int line = 0;
+       if ((jfp = fopen( svars->jname, "r" ))) {
+               if (!lock_state( svars ))
+                       goto jbail;
+               struct stat st;
+               if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp 
)) {
+                       debug( "recovering journal ...\n" );
+                       if (!(ll = strlen( buf )) || buf[--ll] != '\n') {
+                               error( "Error: incomplete journal header in 
%s\n", svars->jname );
+                               goto jbail;
+                       }
+                       buf[ll] = 0;
+                       if (!equals( buf, (int)ll, JOURNAL_VERSION, 
strlen(JOURNAL_VERSION) )) {
+                               error( "Error: incompatible journal version"
+                                      " (got %s, expected " JOURNAL_VERSION 
")\n", buf );
+                               goto jbail;
+                       }
+                       srec = NULL;
+                       line = 1;
+                       while (fgets( buf, sizeof(buf), jfp )) {
+                               line++;
+                               if (!(ll = strlen( buf )) || buf[--ll] != '\n') 
{
+                                       error( "Error: incomplete journal entry 
at %s:%d\n", svars->jname, line );
+                                       goto jbail;
+                               }
+                               buf[ll] = 0;
+                               char c;
+                               int tn;
+                               uint t1, t2, t3, t4;
+                               if ((c = buf[0]) == '#' ?
+                                     (tn = 0, (sscanf( buf + 2, "%u %u %n", 
&t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
+                                     c == '!' ?
+                                       (sscanf( buf + 2, "%u", &t1 ) != 1) :
+                                       c == 'N' || c == 'F' || c == 'T' || c 
== '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ?
+                                         (sscanf( buf + 2, "%u %u", &t1, &t2 ) 
!= 2) :
+                                         c != '^' ?
+                                           (sscanf( buf + 2, "%u %u %u", &t1, 
&t2, &t3 ) != 3) :
+                                           (sscanf( buf + 2, "%u %u %u %u", 
&t1, &t2, &t3, &t4 ) != 4))
+                               {
+                                       error( "Error: malformed journal entry 
at %s:%d\n", svars->jname, line );
+                                       goto jbail;
+                               }
+                               if (c == 'N') {
+                                       svars->maxuid[t1] = t2;
+                               } else if (c == 'F') {
+                                       svars->finduid[t1] = t2;
+                               } else if (c == 'T') {
+                                       *uint_array_append( 
&svars->trashed_msgs[t1] ) = t2;
+                               } else if (c == '!') {
+                                       svars->maxxfuid = t1;
+                               } else if (c == '|') {
+                                       svars->uidval[F] = t1;
+                                       svars->uidval[N] = t2;
+                               } else if (c == '+') {
+                                       srec = nfzalloc( sizeof(*srec) );
+                                       srec->uid[F] = t1;
+                                       srec->uid[N] = t2;
+                                       debug( "  new entry(%u,%u)\n", t1, t2 );
+                                       srec->status = S_PENDING;
+                                       *svars->srecadd = srec;
+                                       svars->srecadd = &srec->next;
+                                       svars->nsrecs++;
+                               } else {
+                                       for (nsrec = srec; srec; srec = 
srec->next)
+                                               if (srec->uid[F] == t1 && 
srec->uid[N] == t2)
+                                                       goto syncfnd;
+                                       for (srec = svars->srecs; srec != 
nsrec; srec = srec->next)
+                                               if (srec->uid[F] == t1 && 
srec->uid[N] == t2)
+                                                       goto syncfnd;
+                                       error( "Error: journal entry at %s:%d 
refers to non-existing sync state entry\n", svars->jname, line );
+                                       goto jbail;
+                                 syncfnd:
+                                       debugn( "  entry(%u,%u,%u) ", 
srec->uid[F], srec->uid[N], srec->flags );
+                                       switch (c) {
+                                       case '-':
+                                               debug( "killed\n" );
+                                               srec->status = S_DEAD;
+                                               break;
+                                       case '=':
+                                               debug( "aborted\n" );
+                                               if (svars->maxxfuid < 
srec->uid[F])
+                                                       svars->maxxfuid = 
srec->uid[F];
+                                               srec->status = S_DEAD;
+                                               break;
+                                       case '#':
+                                               memcpy( srec->tuid, buf + tn + 
2, TUIDL );
+                                               debug( "TUID now %." 
stringify(TUIDL) "s\n", srec->tuid );
+                                               break;
+                                       case '&':
+                                               debug( "TUID %." 
stringify(TUIDL) "s lost\n", srec->tuid );
+                                               srec->tuid[0] = 0;
+                                               break;
+                                       case '<':
+                                               debug( "far side now %u\n", t3 
);
+                                               assign_uid( svars, srec, F, t3 
);
+                                               break;
+                                       case '>':
+                                               debug( "near side now %u\n", t3 
);
+                                               assign_uid( svars, srec, N, t3 
);
+                                               break;
+                                       case '*':
+                                               debug( "flags now %u\n", t3 );
+                                               srec->flags = (uchar)t3;
+                                               srec->aflags[F] = 
srec->aflags[N] = 0;  // Clear F_DELETED from purge
+                                               srec->wstate &= ~W_PURGE;
+                                               break;
+                                       case '~':
+                                               debug( "status now %#x\n", t3 );
+                                               srec->status = (uchar)t3;
+                                               break;
+                                       case '_':
+                                               debug( "has placeholder now\n" 
);
+                                               srec->status = S_PENDING;  // 
Pre-1.4 legacy only
+                                               srec->status |= !srec->uid[F] ? 
S_DUMMY(F) : S_DUMMY(N);
+                                               break;
+                                       case '^':
+                                               debug( "is being upgraded, 
flags %u, srec flags %u\n", t3, t4 );
+                                               srec->pflags = (uchar)t3;
+                                               srec->flags = (uchar)t4;
+                                               srec = upgrade_srec( svars, 
srec );
+                                               break;
+                                       default:
+                                               error( "Error: unrecognized 
journal entry at %s:%d\n", svars->jname, line );
+                                               goto jbail;
+                                       }
+                               }
+                       }
+               }
+               fclose( jfp );
+               sort_uint_array( svars->trashed_msgs[F].array );
+               sort_uint_array( svars->trashed_msgs[N].array );
+       } else {
+               if (errno != ENOENT) {
+                       sys_error( "Error: cannot read journal %s", 
svars->jname );
+                       return 0;
+               }
+       }
+       svars->replayed = line;
+
+       return 1;
+}
+
+static void
+create_state( sync_vars_t *svars )
+{
+       if (!(svars->nfp = fopen( svars->nname, "w" ))) {
+               sys_error( "Error: cannot create new sync state %s", 
svars->nname );
+               exit( 1 );
+       }
+}
+
+void
+jFprintf( sync_vars_t *svars, const char *msg, ... )
+{
+       va_list va;
+
+       if (JLimit && !--JLimit)
+               exit( 101 );
+       if (!svars->jfp) {
+               create_state( svars );
+               if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : 
"w" ))) {
+                       sys_error( "Error: cannot create journal %s", 
svars->jname );
+                       exit( 1 );
+               }
+               setlinebuf( svars->jfp );
+               if (!svars->replayed)
+                       Fprintf( svars->jfp, JOURNAL_VERSION "\n" );
+       }
+       va_start( va, msg );
+       vFprintf( svars->jfp, msg, va );
+       va_end( va );
+       if (JLimit && !--JLimit)
+               exit( 100 );
+}
+
+void
+save_state( sync_vars_t *svars )
+{
+       // If no change was made, the state is also unmodified.
+       if (!svars->jfp && !svars->replayed)
+               return;
+
+       if (!svars->nfp)
+               create_state( svars );
+       Fprintf( svars->nfp,
+                "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid 
%u\nMaxPushedUid %u\n",
+                svars->uidval[F], svars->uidval[N], svars->maxuid[F], 
svars->maxuid[N] );
+       if (svars->maxxfuid)
+               Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid );
+       Fprintf( svars->nfp, "\n" );
+       for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) {
+               if (srec->status & S_DEAD)
+                       continue;
+               char fbuf[16];  // enlarge when support for keywords is added
+               make_flags( srec->flags, fbuf );
+               Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], 
srec->uid[N],
+                        (srec->status & S_DUMMY(F)) ? "<" : (srec->status & 
S_DUMMY(N)) ? ">" : "",
+                        (srec->status & S_SKIPPED) ? "^" : (srec->status & 
S_EXPIRED) ? "~" : "", fbuf );
+       }
+
+       Fclose( svars->nfp, 1 );
+       if (svars->jfp)
+               Fclose( svars->jfp, 0 );
+       if (!(DFlags & KEEPJOURNAL)) {
+               // Order is important!
+               if (rename( svars->nname, svars->dname ))
+                       warn( "Warning: cannot commit sync state %s\n", 
svars->dname );
+               else if (unlink( svars->jname ))
+                       warn( "Warning: cannot delete journal %s\n", 
svars->jname );
+       }
+}
+
+void
+delete_state( sync_vars_t *svars )
+{
+       unlink( svars->nname );
+       unlink( svars->jname );
+       if (unlink( svars->dname ) || unlink( svars->lname )) {
+               sys_error( "Error: channel %s: sync state cannot be deleted", 
svars->chan->name );
+               svars->ret = SYNC_FAIL;
+       }
+}
+
+
+void
+assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
+{
+       srec->uid[t] = uid;
+       if (uid == svars->maxuid[t] + 1)
+               svars->maxuid[t] = uid;
+       srec->status &= ~S_PENDING;
+       srec->wstate &= ~W_UPGRADE;
+       srec->tuid[0] = 0;
+}
+
+void
+assign_tuid( sync_vars_t *svars, sync_rec_t *srec )
+{
+       for (uint i = 0; i < TUIDL; i++) {
+               uchar c = arc4_getbyte() & 0x3f;
+               srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 
:
+                                      c < 62 ? c + '0' - 52 : c == 62 ? '+' : 
'/');
+       }
+       JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], 
srec->tuid), "new TUID" );
+}
+
+int
+match_tuids( sync_vars_t *svars, int t, message_t *msgs )
+{
+       message_t *tmsg, *ntmsg = NULL;
+       const char *diag;
+       int num_lost = 0;
+
+       for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) {
+               if (srec->status & S_DEAD)
+                       continue;
+               if (!srec->uid[t] && srec->tuid[0]) {
+                       debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", 
srec->uid[F], srec->uid[N], srec->tuid );
+                       for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) {
+                               if (tmsg->status & M_DEAD)
+                                       continue;
+                               if (tmsg->tuid[0] && !memcmp( tmsg->tuid, 
srec->tuid, TUIDL )) {
+                                       diag = (tmsg == ntmsg) ? "adjacently" : 
"after gap";
+                                       goto mfound;
+                               }
+                       }
+                       for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) {
+                               if (tmsg->status & M_DEAD)
+                                       continue;
+                               if (tmsg->tuid[0] && !memcmp( tmsg->tuid, 
srec->tuid, TUIDL )) {
+                                       diag = "after reset";
+                                       goto mfound;
+                               }
+                       }
+                       JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID 
lost" );
+                       // Note: status remains S_PENDING.
+                       srec->tuid[0] = 0;
+                       num_lost++;
+                       continue;
+                 mfound:
+                       tmsg->srec = srec;
+                       srec->msg[t] = tmsg;
+                       ntmsg = tmsg->next;
+                       ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag 
);
+               }
+       }
+       return num_lost;
+}
+
+sync_rec_t *
+upgrade_srec( sync_vars_t *svars, sync_rec_t *srec )
+{
+       // Create an entry and append it to the current one.
+       sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) );
+       nsrec->next = srec->next;
+       srec->next = nsrec;
+       if (svars->srecadd == &srec->next)
+               svars->srecadd = &nsrec->next;
+       // Move the placeholder to the new entry.
+       int t = (srec->status & S_DUMMY(F)) ? F : N;
+       nsrec->uid[t] = srec->uid[t];
+       srec->uid[t] = 0;
+       if (srec->msg[t]) {  // NULL during journal replay; is assigned later.
+               nsrec->msg[t] = srec->msg[t];
+               nsrec->msg[t]->srec = nsrec;
+               srec->msg[t] = NULL;
+       }
+       // Mark the original entry for upgrade.
+       srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING;
+       srec->wstate |= W_UPGRADE;
+       // Mark the placeholder for nuking.
+       nsrec->wstate = W_PURGE;
+       nsrec->aflags[t] = F_DELETED;
+       return nsrec;
+}


_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to