commit b1842617f7700c56d3eede8ce6c5afdacf433fca
Author: Oswald Buddenhagen <[email protected]>
Date:   Sat Nov 30 13:03:12 2013 +0100

    make MaxMessages work for new mails as well
    
    this helps enormously on the first sync of a 100k message box with a
    limit of 1k messages. it also happens to make the syncing idempotent.
    
    in a few conditionals we now explicitly test for max_messages being
    enabled, not smaxxuid != 0, as after the initial fetch with no important
    messages smaxxuid is zero, but we still have to skip over 99k messages
    in the above case.

 src/run-tests.pl |   24 +++-------
 src/sync.c       |  109 +++++++++++++++++++++++++++++++++++++---------
 2 files changed, 95 insertions(+), 38 deletions(-)

diff --git a/src/run-tests.pl b/src/run-tests.pl
index 42cd4b1..7210cb3 100755
--- a/src/run-tests.pl
+++ b/src/run-tests.pl
@@ -174,7 +174,7 @@ test("slave max size", \@X11, \@X22, @O22);
 
 my @x30 = (
  [ 0,
-   1, 0, "F", 2, 0, "S", 3, 0, "S", 4, 0, "", 5, 0, "" ],
+   1, 0, "F", 2, 0, "", 3, 0, "S", 4, 0, "", 5, 0, "S", 6, 0, "" ],
  [ 0,
    ],
  [ 0, 0, 0,
@@ -184,26 +184,16 @@ my @x30 = (
 my @O31 = ("", "", "MaxMessages 3\n");
 #show("30", "31", "31");
 my @X31 = (
+ [ 6,
+   1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ],
  [ 5,
-   1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ],
- [ 5,
-   1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ],
- [ 5, 0, 0,
-   1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ],
+   1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
+ [ 6, 2, 0,
+   1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
 );
 test("max messages", \@x30, \@X31, @O31);
 
-my @O41 = ("", "", "MaxMessages 3\nExpunge Both\n");
-#show("40", "41", "41");
-my @X41 = (
- [ 5,
-   1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ],
- [ 5,
-   1, 1, "F", 3, 3, "S", 4, 4, "", 5, 5, "" ],
- [ 5, 2, 0,
-   1, 1, "F", 3, 3, "S", 4, 4, "", 5, 5, "" ],
-);
-test("max messages catch-up", \@X31, \@X41, @O41);
+test("max messages verification", \@X31, \@X31, @O31);
 
 my @x50 = (
  [ 6,
diff --git a/src/sync.c b/src/sync.c
index 24a3bb6..4cbee88 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -155,8 +155,10 @@ typedef struct {
        int flags_total[2], flags_done[2];
        int trash_total[2], trash_done[2];
        int maxuid[2]; /* highest UID that was already propagated */
+       int newmaxuid[2]; /* highest UID that is currently being propagated */
        int uidval[2]; /* UID validity value */
        int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */
+       int mmaxxuid; /* highest expired UID on master during new message 
propagation */
        int smaxxuid; /* highest expired UID on slave */
 } sync_vars_t;
 
@@ -806,6 +808,9 @@ box_selected( int sts, void *aux )
                        goto bail;
                }
        }
+       svars->newmaxuid[M] = svars->maxuid[M];
+       svars->newmaxuid[S] = svars->maxuid[S];
+       svars->mmaxxuid = INT_MAX;
        line = 0;
        if ((jfp = fopen( svars->jname, "r" ))) {
                if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp 
)) {
@@ -829,7 +834,7 @@ box_selected( int sts, void *aux )
                                }
                                if (buf[0] == '#' ?
                                      (t3 = 0, (sscanf( buf + 2, "%d %d %n", 
&t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
-                                     buf[0] == '(' || buf[0] == ')' || buf[0] 
== '{' || buf[0] == '}' ?
+                                     buf[0] == '(' || buf[0] == ')' || buf[0] 
== '{' || buf[0] == '}' || buf[0] == '!' ?
                                        (sscanf( buf + 2, "%d", &t1 ) != 1) :
                                        buf[0] == '+' || buf[0] == '&' || 
buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ?
                                          (sscanf( buf + 2, "%d %d", &t1, &t2 ) 
!= 2) :
@@ -846,6 +851,8 @@ box_selected( int sts, void *aux )
                                        svars->newuid[M] = t1;
                                else if (buf[0] == '}')
                                        svars->newuid[S] = t1;
+                               else if (buf[0] == '!')
+                                       svars->smaxxuid = t1;
                                else if (buf[0] == '|') {
                                        svars->uidval[M] = t1;
                                        svars->uidval[S] = t2;
@@ -853,6 +860,10 @@ box_selected( int sts, void *aux )
                                        srec = nfmalloc( sizeof(*srec) );
                                        srec->uid[M] = t1;
                                        srec->uid[S] = t2;
+                                       if (svars->newmaxuid[M] < t1)
+                                               svars->newmaxuid[M] = t1;
+                                       if (svars->newmaxuid[S] < t2)
+                                               svars->newmaxuid[S] = t2;
                                        debug( "  new entry(%d,%d)\n", t1, t2 );
                                        srec->msg[M] = srec->msg[S] = 0;
                                        srec->status = 0;
@@ -876,6 +887,8 @@ box_selected( int sts, void *aux )
                                        switch (buf[0]) {
                                        case '-':
                                                debug( "killed\n" );
+                                               if (srec->msg[M])
+                                                       srec->msg[M]->srec = 0;
                                                srec->status = S_DEAD;
                                                break;
                                        case '#':
@@ -1014,7 +1027,7 @@ box_selected( int sts, void *aux )
        mexcs = 0;
        nmexcs = rmexcs = 0;
        if (svars->ctx[M]->opts & OPEN_OLD) {
-               if (svars->smaxxuid) {
+               if (chan->max_messages) {
                        /* When messages have been expired on the slave, the 
master fetch is split into
                         * two ranges: The bulk fetch which corresponds with 
the most recent messages, and an
                         * exception list of messages which would have been 
expired if they weren't important. */
@@ -1198,10 +1211,14 @@ box_loaded( int sts, void *aux )
                         * - message is old (> 0) or expired (0) => ignore
                         * - message was skipped (-1) => ReNew
                         * - message was attempted, but failed (-2) => New
-                        * If new have no srec, the message is always New. */
+                        * If new have no srec, the message is always New. If 
messages were previously ignored
+                        * due to being excessive, they would now appear to be 
newer than the messages that
+                        * got actually synced, so make sure to look only at 
the newest ones. As some messages
+                        * may be already propagated before an interruption, 
and maxuid logging is delayed,
+                        * we need to track the newmaxuid separately. */
                        srec = tmsg->srec;
                        if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & 
(srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
-                                : (svars->chan->ops[t] & OP_NEW)) {
+                                : svars->newmaxuid[1-t] < tmsg->uid && 
(svars->chan->ops[t] & OP_NEW)) {
                                debug( "new message %d on %s\n", tmsg->uid, 
str_ms[1-t] );
                                if ((svars->chan->ops[t] & OP_EXPUNGE) && 
(tmsg->flags & F_DELETED))
                                        debug( "  -> not %sing - would be 
expunged anyway\n", str_hl[t] );
@@ -1220,7 +1237,11 @@ box_loaded( int sts, void *aux )
                                                srec->tuid[0] = 0;
                                                srec->uid[1-t] = tmsg->uid;
                                                srec->uid[t] = -2;
+                                               srec->msg[1-t] = tmsg;
+                                               srec->msg[t] = 0;
                                                tmsg->srec = srec;
+                                               if (svars->newmaxuid[1-t] < 
tmsg->uid)
+                                                       svars->newmaxuid[1-t] = 
tmsg->uid;
                                                Fprintf( svars->jfp, "+ %d 
%d\n", srec->uid[M], srec->uid[S] );
                                                debug( "  -> pair(%d,%d) 
created\n", srec->uid[M], srec->uid[S] );
                                        }
@@ -1346,6 +1367,10 @@ box_loaded( int sts, void *aux )
                                alive++;
                        }
                }
+               for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) {
+                       if ((srec = tmsg->srec) && srec->tuid[0] && 
!(tmsg->flags & F_DELETED))
+                               alive++;
+               }
                todel = alive - svars->chan->max_messages;
                debug( "%d alive messages, %d excess - expiring\n", alive, 
todel );
                for (tmsg = svars->ctx[S]->msgs; tmsg; tmsg = tmsg->next) {
@@ -1353,6 +1378,7 @@ box_loaded( int sts, void *aux )
                                continue;
                        if (!(srec = tmsg->srec) || srec->uid[M] <= 0) {
                                /* We did not push the message, so it must be 
kept. */
+                               debug( "  old pair(%d,%d) unpropagated\n", 
srec->uid[M], srec->uid[S] );
                                todel--;
                        } else {
                                nflags = (tmsg->flags | srec->aflags[S]) & 
~srec->dflags[S];
@@ -1360,13 +1386,32 @@ box_loaded( int sts, void *aux )
                                        /* The message is not deleted, or is 
already (being) expired. */
                                        if ((nflags & F_FLAGGED) || !(nflags & 
F_SEEN)) {
                                                /* Important messages are 
always kept. */
+                                               debug( "  old pair(%d,%d) 
important\n", srec->uid[M], srec->uid[S] );
                                                todel--;
                                        } else if (todel > 0 ||
                                                   ((srec->status & 
(S_EXPIRE|S_EXPIRED)) == (S_EXPIRE|S_EXPIRED)) ||
                                                   ((srec->status & 
(S_EXPIRE|S_EXPIRED)) && (tmsg->flags & F_DELETED))) {
                                                /* The message is excess or was 
already (being) expired. */
                                                srec->status |= S_NEXPIRE;
-                                               debug( "  pair(%d,%d)\n", 
srec->uid[M], srec->uid[S] );
+                                               debug( "  old pair(%d,%d) 
expired\n", srec->uid[M], srec->uid[S] );
+                                               todel--;
+                                       }
+                               }
+                       }
+               }
+               for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) {
+                       if ((srec = tmsg->srec) && srec->tuid[0]) {
+                               nflags = tmsg->flags;
+                               if (!(nflags & F_DELETED)) {
+                                       if ((nflags & F_FLAGGED) || !(nflags & 
F_SEEN)) {
+                                               /* Important messages are 
always fetched. */
+                                               debug( "  new pair(%d,%d) 
important\n", srec->uid[M], srec->uid[S] );
+                                               todel--;
+                                       } else if (todel > 0) {
+                                               /* The message is excess. */
+                                               srec->status |= S_NEXPIRE;
+                                               debug( "  new pair(%d,%d) 
expired\n", srec->uid[M], srec->uid[S] );
+                                               svars->mmaxxuid = srec->uid[M];
                                                todel--;
                                        }
                                }
@@ -1374,23 +1419,34 @@ box_loaded( int sts, void *aux )
                }
                debug( "%d excess messages remain\n", todel );
                for (srec = svars->srecs; srec; srec = srec->next) {
-                       if ((srec->status & (S_DEAD|S_DONE)) || !srec->msg[S])
+                       if (srec->status & S_DEAD)
                                continue;
-                       nex = (srec->status / S_NEXPIRE) & 1;
-                       if (nex != ((srec->status / S_EXPIRED) & 1)) {
-                               /* The record needs a state change ... */
-                               if (nex != ((srec->status / S_EXPIRE) & 1)) {
-                                       /* ... and we need to start a 
transaction. */
-                                       Fprintf( svars->jfp, "~ %d %d %d\n", 
srec->uid[M], srec->uid[S], nex );
-                                       debug( "  pair(%d,%d): %d (pre)\n", 
srec->uid[M], srec->uid[S], nex );
-                                       srec->status = (srec->status & 
~S_EXPIRE) | (nex * S_EXPIRE);
+                       if (!srec->tuid[0]) {
+                               if (!srec->msg[S])
+                                       continue;
+                               nex = (srec->status / S_NEXPIRE) & 1;
+                               if (nex != ((srec->status / S_EXPIRED) & 1)) {
+                                       /* The record needs a state change ... 
*/
+                                       if (nex != ((srec->status / S_EXPIRE) & 
1)) {
+                                               /* ... and we need to start a 
transaction. */
+                                               Fprintf( svars->jfp, "~ %d %d 
%d\n", srec->uid[M], srec->uid[S], nex );
+                                               debug( "  pair(%d,%d): %d 
(pre)\n", srec->uid[M], srec->uid[S], nex );
+                                               srec->status = (srec->status & 
~S_EXPIRE) | (nex * S_EXPIRE);
+                                       } else {
+                                               /* ... but the "right" 
transaction is already pending. */
+                                               debug( "  pair(%d,%d): %d 
(pending)\n", srec->uid[M], srec->uid[S], nex );
+                                       }
                                } else {
-                                       /* ... but the "right" transaction is 
already pending. */
-                                       debug( "  pair(%d,%d): %d (pending)\n", 
srec->uid[M], srec->uid[S], nex );
+                                       /* Note: the "wrong" transaction may be 
pending here,
+                                        * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, 
S_EXPIRED = 0. */
                                }
                        } else {
-                               /* Note: the "wrong" transaction may be pending 
here,
-                                * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, S_EXPIRED 
= 0. */
+                               if (srec->status & S_NEXPIRE) {
+                                       Fprintf( svars->jfp, "- %d %d\n", 
srec->uid[M], srec->uid[S] );
+                                       debug( "  pair(%d,%d): 1 (abort)\n", 
srec->uid[M], srec->uid[S] );
+                                       srec->msg[M]->srec = 0;
+                                       srec->status = S_DEAD;
+                               }
                        }
                }
        }
@@ -1515,6 +1571,16 @@ msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int 
t, int uid )
                srec->uid[t] = uid;
                srec->tuid[0] = 0;
        }
+       if (t == S && svars->mmaxxuid < srec->uid[M]) {
+               /* If we have so many new messages that some of them are 
instantly expired,
+                * but some are still propagated because they are important, we 
need to
+                * ensure explicitly that the bulk fetch limit is upped. */
+               svars->mmaxxuid = INT_MAX;
+               if (svars->smaxxuid < srec->uid[S] - 1) {
+                       svars->smaxxuid = srec->uid[S] - 1;
+                       Fprintf( svars->jfp, "! %d\n", svars->smaxxuid );
+               }
+       }
 }
 
 static void msgs_found_new( int sts, void *aux );
@@ -1735,13 +1801,14 @@ box_closed_p2( sync_vars_t *svars, int t )
        if (!(svars->state[1-t] & ST_CLOSED))
                return;
 
-       if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || 
svars->smaxxuid) {
+       if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || 
svars->chan->max_messages) {
                /* This cleanup is not strictly necessary, as the next full sync
                   would throw out the dead entries anyway. But ... */
+               debug( "purging obsolete entries\n" );
 
                minwuid = INT_MAX;
-               if (svars->smaxxuid) {
-                       debug( "preparing entry purge - max expired slave uid 
is %d\n", svars->smaxxuid );
+               if (svars->chan->max_messages) {
+                       debug( "  max expired slave uid is %d\n", 
svars->smaxxuid );
                        for (srec = svars->srecs; srec; srec = srec->next) {
                                if (srec->status & S_DEAD)
                                        continue;

------------------------------------------------------------------------------
Sponsored by Intel(R) XDK 
Develop, test and display web and hybrid apps with a single code base.
Download it for free now!
http://pubads.g.doubleclick.net/gampad/clk?id=111408631&iu=/4140/ostg.clktrk
_______________________________________________
isync-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to