commit 4423a932f3e280a6aa8afd2ae611c9064e840dae
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Mon Dec 14 23:16:01 2020 +0100

    add forced async mode to proxy driver
    
    to test async operation of the syncing core while using the synchronous
    maildir driver, we add a mode to the proxy driver where it queues
    callback invocations to the next main loop iteration.

 src/common.h         |   1 +
 src/driver.h         |   7 +++
 src/drv_imap.c       |   2 +-
 src/drv_proxy.c      | 147 +++++++++++++++++++++++++++++++++++++------
 src/drv_proxy_gen.pl |  25 ++++++--
 src/main.c           |  20 +++---
 src/run-tests.pl     |  42 ++++++++-----
 7 files changed, 196 insertions(+), 48 deletions(-)

diff --git a/src/common.h b/src/common.h
index 8ced726..5518f56 100644
--- a/src/common.h
+++ b/src/common.h
@@ -107,6 +107,7 @@ typedef unsigned long ulong;
 #define VERBOSE         0x800
 #define KEEPJOURNAL     0x1000
 #define ZERODELAY       0x2000
+#define FORCEASYNC      0x4000
 
 extern int DFlags;
 extern int JLimit;
diff --git a/src/driver.h b/src/driver.h
index 250534c..fa394b2 100644
--- a/src/driver.h
+++ b/src/driver.h
@@ -127,6 +127,9 @@ typedef struct {
 #define DRV_CANCELED    4
 
 /* All memory belongs to the driver's user, unless stated otherwise. */
+// If the driver is NOT DRV_ASYNC, memory owned by the driver returned
+// through callbacks MUST remain valid until a related subsequent command
+// is invoked, as the proxy driver may deliver these pointers with delay.
 
 /*
    This flag says that the driver CAN store messages with CRLFs,
@@ -138,6 +141,10 @@ typedef struct {
    This flag says that the driver will act upon (DFlags & VERBOSE).
 */
 #define DRV_VERBOSE     2
+/*
+   This flag says that the driver operates asynchronously.
+*/
+#define DRV_ASYNC       4
 
 #define LIST_INBOX      1
 #define LIST_PATH       2
diff --git a/src/drv_imap.c b/src/drv_imap.c
index 310ceb6..8c93bda 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -3675,7 +3675,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep )
 static uint
 imap_get_caps( store_t *gctx ATTR_UNUSED )
 {
-       return DRV_CRLF | DRV_VERBOSE;
+       return DRV_CRLF | DRV_VERBOSE | DRV_ASYNC;
 }
 
 struct driver imap_driver = {
diff --git a/src/drv_proxy.c b/src/drv_proxy.c
index 0cd9299..1187696 100644
--- a/src/drv_proxy.c
+++ b/src/drv_proxy.c
@@ -21,9 +21,12 @@
 
 #include "driver.h"
 
+#include <assert.h>
 #include <limits.h>
 #include <stdlib.h>
 
+typedef struct gen_cmd gen_cmd_t;
+
 typedef union proxy_store {
        store_t gen;
        struct {
@@ -32,6 +35,9 @@ typedef union proxy_store {
                uint ref_count;
                driver_t *real_driver;
                store_t *real_store;
+               gen_cmd_t *done_cmds, **done_cmds_append;
+               gen_cmd_t *check_cmds, **check_cmds_append;
+               wakeup_t wakeup;
 
                void (*bad_callback)( void *aux );
                void *bad_callback_aux;
@@ -78,8 +84,10 @@ proxy_make_flags( uchar flags, char *buf )
 static void
 proxy_store_deref( proxy_store_t *ctx )
 {
-       if (!--ctx->ref_count)
+       if (!--ctx->ref_count) {
+               assert( !pending_wakeup( &ctx->wakeup ) );
                free( ctx );
+       }
 }
 
 static int curr_tag;
@@ -87,11 +95,24 @@ static int curr_tag;
 #define GEN_CMD \
        uint ref_count; \
        int tag; \
-       proxy_store_t *ctx;
+       proxy_store_t *ctx; \
+       gen_cmd_t *next; \
+       void (*queued_cb)( gen_cmd_t *gcmd );
 
-typedef struct {
+struct gen_cmd {
        GEN_CMD
-} gen_cmd_t;
+};
+
+#define GEN_STS_CMD \
+       GEN_CMD \
+       int sts;
+
+typedef union {
+       gen_cmd_t gen;
+       struct {
+               GEN_STS_CMD
+       };
+} gen_sts_cmd_t;
 
 static gen_cmd_t *
 proxy_cmd_new( proxy_store_t *ctx, uint sz )
@@ -113,6 +134,67 @@ proxy_cmd_done( gen_cmd_t *cmd )
        }
 }
 
+static void
+proxy_wakeup( void *aux )
+{
+       proxy_store_t *ctx = (proxy_store_t *)aux;
+
+       gen_cmd_t *cmd = ctx->done_cmds;
+       assert( cmd );
+       if (!(ctx->done_cmds = cmd->next))
+               ctx->done_cmds_append = &ctx->done_cmds;
+       else
+               conf_wakeup( &ctx->wakeup, 0 );
+       cmd->queued_cb( cmd );
+       proxy_cmd_done( cmd );
+}
+
+static void
+proxy_invoke_cb( gen_cmd_t *cmd, void (*cb)( gen_cmd_t * ), int checked, const 
char *name )
+{
+       if (DFlags & FORCEASYNC) {
+               debug( "%s[% 2d] Callback queue %s%s\n", cmd->ctx->label, 
cmd->tag, name, checked ? " (checked)" : "" );
+               cmd->queued_cb = cb;
+               cmd->next = NULL;
+               if (checked) {
+                       *cmd->ctx->check_cmds_append = cmd;
+                       cmd->ctx->check_cmds_append = &cmd->next;
+               } else {
+                       *cmd->ctx->done_cmds_append = cmd;
+                       cmd->ctx->done_cmds_append = &cmd->next;
+                       conf_wakeup( &cmd->ctx->wakeup, 0 );
+               }
+       } else {
+               cb( cmd );
+               proxy_cmd_done( cmd );
+       }
+}
+
+static void
+proxy_flush_checked_cmds( proxy_store_t *ctx )
+{
+       if (ctx->check_cmds) {
+               *ctx->done_cmds_append = ctx->check_cmds;
+               ctx->done_cmds_append = ctx->check_cmds_append;
+               ctx->check_cmds_append = &ctx->check_cmds;
+               ctx->check_cmds = NULL;
+               conf_wakeup( &ctx->wakeup, 0 );
+       }
+}
+
+static void
+proxy_cancel_checked_cmds( proxy_store_t *ctx )
+{
+       gen_cmd_t *cmd;
+
+       while ((cmd = ctx->check_cmds)) {
+               if (!(ctx->check_cmds = cmd->next))
+                       ctx->check_cmds_append = &ctx->check_cmds;
+               ((gen_sts_cmd_t *)cmd)->sts = DRV_CANCELED;
+               cmd->queued_cb( cmd );
+       }
+}
+
 #if 0
 //# TEMPLATE GETTER
 static @type@proxy_@name@( store_t *gctx )
@@ -155,9 +237,10 @@ static @type@proxy_@name@( store_t *gctx@decl_args@ )
 
 //# TEMPLATE CALLBACK
 typedef union {
-       gen_cmd_t gen;
+       @gen_cmd_t@ gen;
        struct {
-               GEN_CMD
+               @GEN_CMD@
+               @decl_cb_state@
                void (*callback)( @decl_cb_args@void *aux );
                void *callback_aux;
                @decl_state@
@@ -165,16 +248,24 @@ typedef union {
 } @name@_cmd_t;
 
 static void
-proxy_@name@_cb( @decl_cb_args@void *aux )
+proxy_do_@name@_cb( gen_cmd_t *gcmd )
 {
-       @name@_cmd_t *cmd = (@name@_cmd_t *)aux;
+       @name@_cmd_t *cmd = (@name@_cmd_t *)gcmd;
 
        @pre_print_cb_args@
        debug( "%s[% 2d] Callback enter @name@@print_fmt_cb_args@\n", 
cmd->ctx->label, cmd->tag@print_pass_cb_args@ );
        @print_cb_args@
        cmd->callback( @pass_cb_args@cmd->callback_aux );
        debug( "%s[% 2d] Callback leave @name@\n", cmd->ctx->label, cmd->tag );
-       proxy_cmd_done( &cmd->gen );
+}
+
+static void
+proxy_@name@_cb( @decl_cb_args@void *aux )
+{
+       @name@_cmd_t *cmd = (@name@_cmd_t *)aux;
+
+       @save_cb_args@
+       proxy_invoke_cb( @gen_cmd@, proxy_do_@name@_cb, @checked@, "@name@" );
 }
 
 static @type@proxy_@name@( store_t *gctx@decl_args@, void (*cb)( 
@decl_cb_args@void *aux ), void *aux )
@@ -190,15 +281,15 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void 
(*cb)( @decl_cb_args@v
        @print_args@
        ctx->real_driver->@name@( ctx->real_store@pass_args@, proxy_@name@_cb, 
cmd );
        debug( "%s[% 2d] Leave @name@\n", ctx->label, cmd->tag );
-       proxy_cmd_done( &cmd->gen );
+       proxy_cmd_done( @gen_cmd@ );
 }
 //# END
 
 //# UNDEFINE list_store_print_fmt_cb_args
 //# UNDEFINE list_store_print_pass_cb_args
 //# DEFINE list_store_print_cb_args
-       if (sts == DRV_OK) {
-               for (string_list_t *box = boxes; box; box = box->next)
+       if (cmd->sts == DRV_OK) {
+               for (string_list_t *box = cmd->boxes; box; box = box->next)
                        debug( "  %s\n", box->string );
        }
 //# END
@@ -217,21 +308,21 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void 
(*cb)( @decl_cb_args@v
        }
 //# END
 //# DEFINE load_box_print_fmt_cb_args , sts=%d, total=%d, recent=%d
-//# DEFINE load_box_print_pass_cb_args , sts, total_msgs, recent_msgs
+//# DEFINE load_box_print_pass_cb_args , cmd->sts, cmd->total_msgs, 
cmd->recent_msgs
 //# DEFINE load_box_print_cb_args
-       if (sts == DRV_OK) {
+       if (cmd->sts == DRV_OK) {
                static char fbuf[as(Flags) + 1];
-               for (message_t *msg = msgs; msg; msg = msg->next)
+               for (message_t *msg = cmd->msgs; msg; msg = msg->next)
                        debug( "  uid=%-5u flags=%-4s size=%-6u tuid=%." 
stringify(TUIDL) "s\n",
                               msg->uid, (msg->status & M_FLAGS) ? 
(proxy_make_flags( msg->flags, fbuf ), fbuf) : "?", msg->size, *msg->tuid ? 
msg->tuid : "?" );
        }
 //# END
 
 //# DEFINE find_new_msgs_print_fmt_cb_args , sts=%d
-//# DEFINE find_new_msgs_print_pass_cb_args , sts
+//# DEFINE find_new_msgs_print_pass_cb_args , cmd->sts
 //# DEFINE find_new_msgs_print_cb_args
-       if (sts == DRV_OK) {
-               for (message_t *msg = msgs; msg; msg = msg->next)
+       if (cmd->sts == DRV_OK) {
+               for (message_t *msg = cmd->msgs; msg; msg = msg->next)
                        debug( "  uid=%-5u tuid=%." stringify(TUIDL) "s\n", 
msg->uid, msg->tuid );
        }
 //# END
@@ -251,7 +342,7 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void 
(*cb)( @decl_cb_args@v
 //# DEFINE fetch_msg_print_fmt_cb_args , flags=%s, date=%lld, size=%u
 //# DEFINE fetch_msg_print_pass_cb_args , fbuf, (long long)cmd->data->date, 
cmd->data->len
 //# DEFINE fetch_msg_print_cb_args
-       if (sts == DRV_OK && (DFlags & DEBUG_DRV_ALL)) {
+       if (cmd->sts == DRV_OK && (DFlags & DEBUG_DRV_ALL)) {
                printf( "%s=========\n", cmd->ctx->label );
                fwrite( cmd->data->data, cmd->data->len, 1, stdout );
                printf( "%s=========\n", cmd->ctx->label );
@@ -281,14 +372,29 @@ static @type@proxy_@name@( store_t *gctx@decl_args@, void 
(*cb)( @decl_cb_args@v
 //# END
 //# DEFINE set_msg_flags_print_fmt_args , uid=%u, add=%s, del=%s
 //# DEFINE set_msg_flags_print_pass_args , uid, fbuf1, fbuf2
+//# DEFINE set_msg_flags_checked sts == DRV_OK
 
 //# DEFINE trash_msg_print_fmt_args , uid=%u
 //# DEFINE trash_msg_print_pass_args , msg->uid
 
+//# DEFINE commit_cmds_print_args
+       proxy_flush_checked_cmds( ctx );
+//# END
+
+//# DEFINE cancel_cmds_print_cb_args
+       proxy_cancel_checked_cmds( cmd->ctx );
+//# END
+
+//# DEFINE free_store_print_args
+       proxy_cancel_checked_cmds( ctx );
+//# END
 //# DEFINE free_store_action
        proxy_store_deref( ctx );
 //# END
 
+//# DEFINE cancel_store_print_args
+       proxy_cancel_checked_cmds( ctx );
+//# END
 //# DEFINE cancel_store_action
        proxy_store_deref( ctx );
 //# END
@@ -325,9 +431,12 @@ proxy_alloc_store( store_t *real_ctx, const char *label )
        ctx->gen.conf = real_ctx->conf;
        ctx->ref_count = 1;
        ctx->label = label;
+       ctx->done_cmds_append = &ctx->done_cmds;
+       ctx->check_cmds_append = &ctx->check_cmds;
        ctx->real_driver = real_ctx->driver;
        ctx->real_store = real_ctx;
        ctx->real_driver->set_bad_callback( ctx->real_store, (void (*)(void 
*))proxy_invoke_bad_callback, ctx );
+       init_wakeup( &ctx->wakeup, proxy_wakeup, ctx );
        return &ctx->gen;
 }
 
diff --git a/src/drv_proxy_gen.pl b/src/drv_proxy_gen.pl
index 8d41249..b3a3ce0 100755
--- a/src/drv_proxy_gen.pl
+++ b/src/drv_proxy_gen.pl
@@ -150,11 +150,26 @@ for (@ptypes) {
        } else {
                if ($cmd_type eq "void " && $cmd_args =~ s/, void \(\*cb\)\( 
(.*)void \*aux \), void \*aux$//) {
                        my $cmd_cb_args = $1;
-                       $replace{'decl_cb_args'} = $cmd_cb_args;
-                       $replace{'pass_cb_args'} = make_args($cmd_cb_args);
-                       my $cmd_print_cb_args = $cmd_cb_args =~ s/(.*), $/, 
$1/r;
-                       $replace{'print_pass_cb_args'} = 
make_args($cmd_print_cb_args);
-                       $replace{'print_fmt_cb_args'} = 
make_format($cmd_print_cb_args);
+                       if (length($cmd_cb_args)) {
+                               $replace{'decl_cb_args'} = $cmd_cb_args;
+                               my $r_cmd_cb_args = $cmd_cb_args;
+                               $r_cmd_cb_args =~ s/^int sts, // or 
die("Callback arguments of $cmd_name don't start with sts.\n");
+                               $replace{'decl_cb_state'} = $r_cmd_cb_args =~ 
s/, /\;\n/gr;
+                               my $pass_cb_args = make_args($cmd_cb_args);
+                               $replace{'save_cb_args'} = $pass_cb_args =~ 
s/([^,]+), /cmd->$1 = $1\;\n/gr;
+                               $pass_cb_args =~ s/([^, ]+)/cmd->$1/g;
+                               $replace{'pass_cb_args'} = $pass_cb_args;
+                               $replace{'print_pass_cb_args'} = $pass_cb_args 
=~ s/(.*), $/, $1/r;
+                               $replace{'print_fmt_cb_args'} = 
make_format($cmd_cb_args =~ s/(.*), $/, $1/r);
+                               $replace{'gen_cmd_t'} = "gen_sts_cmd_t";
+                               $replace{'GEN_CMD'} = "GEN_STS_CMD\n";
+                               $replace{'gen_cmd'} = "&cmd->gen.gen";
+                       } else {
+                               $replace{'gen_cmd_t'} = "gen_cmd_t";
+                               $replace{'GEN_CMD'} = "GEN_CMD\n";
+                               $replace{'gen_cmd'} = "&cmd->gen";
+                       }
+                       $replace{'checked'} //= '0';
                        $template = "CALLBACK";
                } elsif ($cmd_type eq "void ") {
                        $template = "REGULAR_VOID";
diff --git a/src/main.c b/src/main.c
index 7cc513e..60623ec 100644
--- a/src/main.c
+++ b/src/main.c
@@ -713,6 +713,9 @@ main( int argc, char **argv )
                case 'T':
                        for (; *ochar; ) {
                                switch (*ochar++) {
+                               case 'a':
+                                       DFlags |= FORCEASYNC;
+                                       break;
                                case 'j':
                                        DFlags |= KEEPJOURNAL;
                                        JLimit = strtol( ochar, &ochar, 10 );
@@ -861,20 +864,23 @@ sync_chans( main_vars_t *mvars, int ent )
                if (mvars->skip)
                        goto next2;
                mvars->state[F] = mvars->state[N] = ST_FRESH;
-               if ((DFlags & DEBUG_DRV) || 
(mvars->chan->stores[F]->driver->get_caps( NULL ) & 
mvars->chan->stores[N]->driver->get_caps( NULL ) & DRV_VERBOSE))
+               uint dcaps[2];
+               for (t = 0; t < 2; t++) {
+                       mvars->drv[t] = mvars->chan->stores[t]->driver;
+                       dcaps[t] = mvars->drv[t]->get_caps( NULL );
+               }
+               if ((DFlags & DEBUG_DRV) || (dcaps[F] & dcaps[N] & DRV_VERBOSE))
                        labels[F] = "F: ", labels[N] = "N: ";
                else
                        labels[F] = labels[N] = "";
                for (t = 0; t < 2; t++) {
-                       driver_t *drv = mvars->chan->stores[t]->driver;
-                       store_t *ctx = drv->alloc_store( 
mvars->chan->stores[t], labels[t] );
-                       if (DFlags & DEBUG_DRV) {
-                               drv = &proxy_driver;
+                       store_t *ctx = mvars->drv[t]->alloc_store( 
mvars->chan->stores[t], labels[t] );
+                       if ((DFlags & DEBUG_DRV) || ((DFlags & FORCEASYNC) && 
!(dcaps[t] & DRV_ASYNC))) {
+                               mvars->drv[t] = &proxy_driver;
                                ctx = proxy_alloc_store( ctx, labels[t] );
                        }
-                       mvars->drv[t] = drv;
                        mvars->ctx[t] = ctx;
-                       drv->set_bad_callback( ctx, store_bad, AUX );
+                       mvars->drv[t]->set_bad_callback( ctx, store_bad, AUX );
                }
                for (t = 0; ; t++) {
                        info( "Opening %s store %s...\n", str_fn[t], 
mvars->chan->stores[t]->name );
diff --git a/src/run-tests.pl b/src/run-tests.pl
index a73fb27..292aff6 100755
--- a/src/run-tests.pl
+++ b/src/run-tests.pl
@@ -334,10 +334,10 @@ sub killcfg()
        unlink ".mbsyncrc";
 }
 
-# $options
-sub runsync($$)
+# $run_async, $mbsync_options, $log_file
+sub runsync($$$)
 {
-       my ($flags, $file) = @_;
+       my ($async, $flags, $file) = @_;
 
        my $cmd;
        if ($use_vg) {
@@ -345,6 +345,7 @@ sub runsync($$)
        } else {
                $flags .= " -D";
        }
+       $flags .= " -Ta" if ($async);
        $cmd .= "$mbsync -Tz $flags -c .mbsyncrc test";
        open FILE, "$cmd 2>&1 |";
        my @out = <FILE>;
@@ -477,7 +478,7 @@ sub show($$$)
        showchan("near/.mbsyncstate");
        print ");\n";
        &writecfg(@sfx);
-       runsync("", "");
+       runsync(0, "", "");
        killcfg();
        print "my \@X$tx = (\n";
        showchan("near/.mbsyncstate");
@@ -681,18 +682,14 @@ sub readfile($)
        return @nj;
 }
 
-# $title, \@source_state, \@target_state, @channel_configs
-sub test($$$@)
+# $run_async, \@source_state, \@target_state, @channel_configs
+sub test_impl($$$@)
 {
-       my ($ttl, $sx, $tx, @sfx) = @_;
-
-       return 0 if (scalar(@ARGV) && !grep { $_ eq $ttl } @ARGV);
-       print "Testing: ".$ttl." ...\n";
-       &writecfg(@sfx);
+       my ($async, $sx, $tx, @sfx) = @_;
 
        mkchan($$sx[0], $$sx[1], @{ $$sx[2] });
 
-       my ($xc, @ret) = runsync("-Tj", "1-initial.log");
+       my ($xc, @ret) = runsync($async, "-Tj", "1-initial.log");
        if ($xc || ckchan("near/.mbsyncstate.new", $tx)) {
                print "Input:\n";
                printchan($sx);
@@ -710,7 +707,7 @@ sub test($$$@)
        }
 
        my @nj = readfile("near/.mbsyncstate.journal");
-       my ($jxc, @jret) = runsync("-0 --no-expunge", "2-replay.log");
+       my ($jxc, @jret) = runsync($async, "-0 --no-expunge", "2-replay.log");
        if ($jxc || ckstate("near/.mbsyncstate", @{ $$tx[2] })) {
                print "Journal replay failed.\n";
                print "Options:\n";
@@ -729,7 +726,7 @@ sub test($$$@)
                exit 1;
        }
 
-       my ($ixc, @iret) = runsync("", "3-verify.log");
+       my ($ixc, @iret) = runsync($async, "", "3-verify.log");
        if ($ixc || ckchan("near/.mbsyncstate", $tx)) {
                print "Idempotence verification run failed.\n";
                print "Input == Expected result:\n";
@@ -752,7 +749,7 @@ sub test($$$@)
        for (my $l = 1; $l <= $njl; $l++) {
                mkchan($$sx[0], $$sx[1], @{ $$sx[2] });
 
-               my ($nxc, @nret) = runsync("-Tj$l", "4-interrupt.log");
+               my ($nxc, @nret) = runsync($async, "-Tj$l", "4-interrupt.log");
                if ($nxc != (100 + ($l & 1)) << 8) {
                        print "Interrupting at step $l/$njl failed.\n";
                        print "Debug output:\n";
@@ -760,7 +757,7 @@ sub test($$$@)
                        exit 1;
                }
 
-               ($nxc, @nret) = runsync("-Tj", "5-resume.log");
+               ($nxc, @nret) = runsync($async, "-Tj", "5-resume.log");
                if ($nxc || ckchan("near/.mbsyncstate.new", $tx)) {
                        print "Resuming from step $l/$njl failed.\n";
                        print "Input:\n";
@@ -785,6 +782,19 @@ sub test($$$@)
                rmtree "near";
                rmtree "far";
        }
+}
+
+# $title, \@source_state, \@target_state, @channel_configs
+sub test($$$@)
+{
+       my ($ttl, $sx, $tx, @sfx) = @_;
+
+       return 0 if (scalar(@ARGV) && !grep { $_ eq $ttl } @ARGV);
+       print "Testing: ".$ttl." ...\n";
+       &writecfg(@sfx);
+
+       test_impl(0, $sx, $tx, @sfx);
+       test_impl(1, $sx, $tx, @sfx);
 
        killcfg();
 }


_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to