commit 0da273686f7ca5a5be7b27cc27480b52465fea24
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Fri Jun 17 15:57:56 2022 +0200

    rework flag propagation during placeholder upgrade
    
    don't implicitly propagate flags with upgrades. the user asked for
    replacing the body, so do just that. if they also asked for flag
    propagation, handle it like the case without upgrade as far as possible.
    this makes async parallel flag propagation in the opposite direction
    robust, while still being reasonably simple.

 src/run-tests.pl | 16 +++++---
 src/sync.c       | 95 +++++++++++++++++++++++++++++++-----------------
 src/sync_state.c | 21 ++++++++---
 3 files changed, 88 insertions(+), 44 deletions(-)

diff --git a/src/run-tests.pl b/src/run-tests.pl
index fceb5c22..02cd37a8 100755
--- a/src/run-tests.pl
+++ b/src/run-tests.pl
@@ -1066,20 +1066,24 @@ my @X21 = (
 test("max size", \@x20, \@X21, \@O21);
 
 my @x22 = (
-  C, 0, B,
+  E, 0, B,
   A, "*", "", "",
-  B, "**", "", "",
-  C, "*?", "*<", "*F*",
+  B, "*PR*", "", "",
+  C, "*PR?", "*<DP", "*DFP*",
+  D, "*PR?", "*<DP", "*DP*",
+  E, "*PR*", "*>DP", "*DP?",
   A, "", "*", "*",
-  B, "", "*>", "*F?",
+  B, "", "*>DP", "*DFP?",
 );
 
 my @X22 = (
   C, 0, B,
-  B, "", ">->", "^*",
+  B, "", ">->D+R", "^PR*",
   B, "", "", "&1/",
-  C, "^F*", "<-<+F", "",
+  C, "^FPR*", "<-<D+FR", "-D+R",
   C, "&1+T", "^", "&",
+  D, "", "-D+R", "-D+R",
+  E, "", "-D+R", "-D+R",
 );
 test("max size + flagging", \@x22, \@X22, \@O21);
 
diff --git a/src/sync.c b/src/sync.c
index 4099742e..c9cfbe8a 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -289,12 +289,21 @@ msg_fetched( int sts, void *aux )
                        return;
                }
 
-               vars->data.flags = sanitize_flags( vars->data.flags, svars, t );
-               if (srec && !(srec->status & S_UPGRADE)) {
-                       if (vars->data.flags) {
-                               srec->pflags = vars->data.flags;
-                               JLOG( "%% %u %u %u", (srec->uid[F], 
srec->uid[N], srec->pflags),
-                                     "%sing with flags %s", (str_hl[t], 
fmt_lone_flags( srec->pflags ).str) );
+               if (srec && (srec->status & S_UPGRADE)) {
+                       vars->data.flags = (srec->pflags | srec->aflags[t]) & 
~srec->dflags[t];
+                       if (srec->aflags[t] || srec->dflags[t]) {
+                               JLOG( "$ %u %u %u %u", (srec->uid[F], 
srec->uid[N], srec->aflags[t], srec->dflags[t]),
+                                     "%sing upgrade with flags: +%s -%s",
+                                     (str_hl[t], fmt_flags( srec->aflags[t] 
).str, fmt_flags( srec->dflags[t] ).str) );
+                       }
+               } else {
+                       vars->data.flags = sanitize_flags( vars->data.flags, 
svars, t );
+                       if (srec) {
+                               if (vars->data.flags) {
+                                       srec->pflags = vars->data.flags;
+                                       JLOG( "%% %u %u %u", (srec->uid[F], 
srec->uid[N], srec->pflags),
+                                             "%sing with flags %s", 
(str_hl[t], fmt_lone_flags( srec->pflags ).str) );
+                               }
                        }
                }
 
@@ -1008,7 +1017,23 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
                        for (t = 0; t < 2; t++) {
                                if (srec->msg[t] && (srec->msg[t]->flags & 
F_DELETED))
                                        srec->status |= S_DEL(t);
-                               if (del[t]) {
+                               if (srec->status & S_UPGRADE) {
+                                       // Such records hold orphans by 
definition, so the del[] cases are irrelevant.
+                                       if (srec->uid[t]) {
+                                               // Direction towards the source 
message.
+                                               // The placeholder was already 
detached, so use its saved flags instead.
+                                               sflags = srec->pflags;
+                                               goto doflags;
+                                       }
+                                       // Direction towards the copy.
+                                       if (srec->msg[t^1]) {
+                                               // Flag propagation along 
placeholder upgrades must be explicitly requested,
+                                               // and is, at the source, 
handled like any other flag update.
+                                               sflags = srec->msg[t^1]->flags;
+                                               goto doflags;
+                                       }
+                                       debug( "  no %s\n", str_fn[t^1] );
+                               } else if (del[t]) {
                                        // The target was newly expunged, so 
there is nothing to update.
                                        // The deletion is propagated in the 
opposite iteration.
                                } else if (!srec->uid[t]) {
@@ -1044,8 +1069,11 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
                                        debug( "  no %s\n", str_fn[t^1] );
                                } else {
                                        // We have a source. The target may be 
in an unknown state.
+                                       sflags = srec->msg[t^1]->flags;
+
+                                 doflags:
                                        if (svars->chan->ops[t] & OP_FLAGS) {
-                                               sflags = sanitize_flags( 
srec->msg[t^1]->flags, svars, t );
+                                               sflags = sanitize_flags( 
sflags, svars, t );
                                                if ((t == F) && (srec->status & 
(S_EXPIRE|S_EXPIRED))) {
                                                        /* Don't propagate 
deletion resulting from expiration. */
                                                        debug( "  near side 
expiring\n" );
@@ -1076,32 +1104,15 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
                                ushort suflags = srec->msg[N] ? 
srec->msg[N]->flags : 0;
                                if ((muflags | suflags) & F_FLAGGED) {
                                        t = (srec->status & S_DUMMY(F)) ? F : N;
-                                       // We calculate the flags for the 
replicated message already now,
-                                       // because after an interruption the 
dummy may be already gone.
-                                       srec->pflags = ((srec->msg[t]->flags & 
~(F_SEEN|F_FLAGGED)) | srec->aflags[t]) & ~srec->dflags[t];
-                                       // Consequently, the srec's flags are 
committed right away as well.
-                                       srec->flags = (srec->flags | 
srec->aflags[t]) & ~srec->dflags[t];
-                                       JLOG( "^ %u %u %u %u", (srec->uid[F], 
srec->uid[N], srec->pflags, srec->flags),
-                                             "upgrading %s placeholder, 
dummy's flags %s, srec flags %s",
-                                             (str_fn[t], fmt_lone_flags( 
srec->pflags ).str, fmt_lone_flags( srec->flags ).str) );
+                                       // We save away the dummy's flags, 
because after an
+                                       // interruption it may be already gone. 
Filtering as above.
+                                       srec->pflags = srec->msg[t]->flags & 
~(F_SEEN | F_FLAGGED);
+                                       JLOG( "^ %u %u %u", (srec->uid[F], 
srec->uid[N], srec->pflags),
+                                             "upgrading %s placeholder, 
dummy's flags %s",
+                                             (str_fn[t], fmt_lone_flags( 
srec->pflags ).str) );
                                        nsrec = upgrade_srec( svars, srec, t );
                                }
                        }
-                       // This is separated, because the upgrade can come from 
the journal.
-                       if (srec->status & S_UPGRADE) {
-                               t = !srec->uid[F] ? F : N;
-                               tmsg = srec->msg[t^1];
-                               if ((svars->chan->ops[t] & OP_EXPUNGE) && 
(srec->pflags & F_DELETED)) {
-                                       JLOG( "- %u %u", (srec->uid[F], 
srec->uid[N]), "killing upgrade - would be expunged anyway" );
-                                       tmsg->srec = NULL;
-                                       srec->status = S_DEAD;
-                               } else {
-                                       // Pretend that the source message has 
the adjusted flags of the dummy.
-                                       tmsg->flags = srec->pflags;
-                                       tmsg->status |= M_FLAGS;
-                                       any_new[t] = 1;
-                               }
-                       }
                        srec = nsrec;  // Minor optimization: skip freshly 
created placeholder entry.
                }
        }
@@ -1136,12 +1147,30 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
                                                      "was previously skipped" 
);
                                        }
                                } else {
-                                       if (!(svars->chan->ops[t] & OP_NEW))
+                                       if (!(svars->chan->ops[t] & OP_NEW) && 
!(srec->status & S_UPGRADE))
                                                continue;
                                        if (!(srec->status & S_PENDING))
                                                continue;  // Nothing to do - 
the message is paired or expired
                                        // Propagation was scheduled, but we 
got interrupted
                                        debug( "unpropagated old message %u\n", 
tmsg->uid );
+
+                                       if (srec->status & S_UPGRADE) {
+                                               if ((svars->chan->ops[t] & 
OP_EXPUNGE) &&
+                                                   ((srec->pflags | 
srec->aflags[t]) & ~srec->dflags[t] & F_DELETED)) {
+                                                       // We can't just kill 
the entry, as we may be propagating flags
+                                                       // (in particular, 
F_DELETED) towards the real message.
+                                                       // No dummy is actually 
present, but pretend there is, so the
+                                                       // real message is 
considered new when trashing.
+                                                       srec->status = 
(srec->status & ~(S_PENDING | S_UPGRADE)) | S_DUMMY(t);
+                                                       JLOG( "~ %u %u %d", 
(srec->uid[F], srec->uid[N], srec->status & S_LOGGED),
+                                                             "canceling 
placeholder upgrade - would be expunged anyway" );
+                                                       continue;
+                                               }
+                                               // Prevent the driver from 
"completing" the flags, as we'll ignore them anyway.
+                                               tmsg->status |= M_FLAGS;
+                                               any_new[t] = 1;
+                                               continue;
+                                       }
                                }
                        } else {
                                // The 1st unknown message which should be 
known marks the end
@@ -1183,7 +1212,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int 
recent_msgs, void *aux
                                continue;
                        }
                        if (!(tmsg->flags & F_FLAGGED) && tmsg->size > 
svars->chan->stores[t]->max_size &&
-                           !(srec->status & (S_DUMMY(F) | S_DUMMY(N) | 
S_UPGRADE))) {
+                           !(srec->status & (S_DUMMY(F) | S_DUMMY(N)))) {
                                srec->status |= S_DUMMY(t);
                                JLOG( "_ %u %u", (srec->uid[F], srec->uid[N]), 
"placeholder only - too big" );
                        }
diff --git a/src/sync_state.c b/src/sync_state.c
index 82cc5f15..32d1eab9 100644
--- a/src/sync_state.c
+++ b/src/sync_state.c
@@ -308,9 +308,10 @@ load_state( sync_vars_t *svars )
                                case '*':
                                case '%':
                                case '~':
+                               case '^':
                                        bad = sscanf( buf + 2, "%u %u %u", &t1, 
&t2, &t3 ) != 3;
                                        break;
-                               case '^':
+                               case '$':
                                        bad = sscanf( buf + 2, "%u %u %u %u", 
&t1, &t2, &t3, &t4 ) != 4;
                                        break;
                                default:
@@ -411,11 +412,17 @@ load_state( sync_vars_t *svars )
                                        case '^':
                                                tn = (srec->status & 
S_DUMMY(F)) ? F : N;
                                                srec->pflags = (uchar)t3;
-                                               srec->flags = (uchar)t4;
-                                               debug( "upgrading %s 
placeholder, dummy's flags %s, srec flags %s\n",
-                                                      str_fn[tn], 
fmt_lone_flags( t3 ).str, fmt_lone_flags( t4 ).str );
+                                               debug( "upgrading %s 
placeholder, dummy's flags %s\n",
+                                                      str_fn[tn], 
fmt_lone_flags( t3 ).str );
                                                srec = upgrade_srec( svars, 
srec, tn );
                                                break;
+                                       case '$':
+                                               tn = !srec->uid[F] ? F : N;
+                                               srec->aflags[tn] = (uchar)t3;
+                                               srec->dflags[tn] = (uchar)t4;
+                                               debug( "flag update for %s now 
+%s -%s\n",
+                                                      str_fn[tn], fmt_flags( 
t3 ).str, fmt_flags( t4 ).str );
+                                               break;
                                        default:
                                                assert( !"Unhandled journal 
entry" );
                                        }
@@ -522,8 +529,12 @@ assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, 
uint uid )
        if (uid == svars->newmaxuid[t] + 1)
                svars->newmaxuid[t] = uid;
        if (uid) {
-               if (!(srec->status & S_UPGRADE))
+               if (srec->status & S_UPGRADE) {
+                       srec->flags = (srec->flags | srec->aflags[t]) & 
~srec->dflags[t];
+                       srec->aflags[t] = srec->dflags[t] = 0;  // Cleanup 
after journal replay
+               } else {
                        srec->flags = srec->pflags;
+               }
        }
        srec->status &= ~(S_PENDING | S_UPGRADE);
        srec->tuid[0] = 0;


_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to