commit 1b040e926f618f75b4fea0d797c14ff4d2c160d4
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Sun Feb 15 17:37:33 2015 +0100

    soft-limit peak memory usage
    
    propagating many messages from a fast store (typically maildir or a
    local IMAP server) to a slow asynchronous store could cause gigabytes of
    data being buffered. avoid this by throttling fetches if the target
    context reports memory usage above a configurable limit.
    
    REFMAIL: 9737edb14457c71af4ed156c1be0a...@mpcjanssen.nl

 src/common.h      |    2 +
 src/config.c      |    8 +++++++
 src/driver.h      |    3 ++
 src/drv_imap.c    |   20 ++++++++++++++++-
 src/drv_maildir.c |    7 ++++++
 src/main.c        |    2 +
 src/mbsync.1      |    8 +++++++
 src/socket.c      |    2 +
 src/socket.h      |    1 +
 src/sync.c        |   51 ++++++++++++++++++++++++++++----------------
 10 files changed, 84 insertions(+), 20 deletions(-)

diff --git a/src/common.h b/src/common.h
index 06c2e3e..ed278b4 100644
--- a/src/common.h
+++ b/src/common.h
@@ -76,6 +76,8 @@ extern int Pid;
 extern char Hostname[256];
 extern const char *Home;
 
+extern int BufferLimit;
+
 /* util.c */
 
 void ATTR_PRINTFLIKE(1, 2) debug( const char *, ... );
diff --git a/src/config.c b/src/config.c
index 1c181ce..20f09d9 100644
--- a/src/config.c
+++ b/src/config.c
@@ -482,6 +482,14 @@ load_config( const char *where, int pseudo )
                                }
                        }
                }
+               else if (!strcasecmp( "BufferLimit", cfile.cmd ))
+               {
+                       BufferLimit = parse_size( &cfile );
+                       if (BufferLimit <= 0) {
+                               error( "%s:%d: BufferLimit must be positive\n", 
cfile.file, cfile.line );
+                               cfile.err = 1;
+                       }
+               }
                else if (!getopt_helper( &cfile, &gcops, &global_conf ))
                {
                        error( "%s:%d: unknown section keyword '%s'\n",
diff --git a/src/driver.h b/src/driver.h
index e79a096..3aa30a9 100644
--- a/src/driver.h
+++ b/src/driver.h
@@ -246,6 +246,9 @@ struct driver {
 
        /* Commit any pending set_msg_flags() commands. */
        void (*commit_cmds)( store_t *ctx );
+
+       /* Get approximate amount of memory occupied by the driver. */
+       int (*memory_usage)( store_t *ctx );
 };
 
 void free_generic_messages( message_t * );
diff --git a/src/drv_imap.c b/src/drv_imap.c
index 7146d26..2620f49 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -111,6 +111,7 @@ typedef struct imap_store {
        int nexttag, num_in_progress;
        struct imap_cmd *pending, **pending_append;
        struct imap_cmd *in_progress, **in_progress_append;
+       int buffer_mem; /* memory currently occupied by buffers in the queue */
 
        /* Used during sequential operations like connect */
        enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } 
greeting;
@@ -256,7 +257,10 @@ static void
 done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response )
 {
        cmd->param.done( ctx, cmd, response );
-       free( cmd->param.data );
+       if (cmd->param.data) {
+               free( cmd->param.data );
+               ctx->buffer_mem -= cmd->param.data_len;
+       }
        free( cmd->cmd );
        free( cmd );
 }
@@ -299,6 +303,7 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
                iov[1].len = cmd->param.data_len;
                iov[1].takeOwn = GiveOwn;
                cmd->param.data = 0;
+               ctx->buffer_mem -= cmd->param.data_len;
                iov[2].buf = "\r\n";
                iov[2].len = 2;
                iov[2].takeOwn = KeepOwn;
@@ -1317,6 +1322,7 @@ imap_socket_read( void *aux )
                                iov[0].len = cmdp->param.data_len;
                                iov[0].takeOwn = GiveOwn;
                                cmdp->param.data = 0;
+                               ctx->buffer_mem -= cmdp->param.data_len;
                                iov[1].buf = "\r\n";
                                iov[1].len = 2;
                                iov[1].takeOwn = KeepOwn;
@@ -2507,6 +2513,7 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int 
to_trash,
        flagstr[d] = 0;
 
        INIT_IMAP_CMD(imap_cmd_out_uid, cmd, cb, aux)
+       ctx->buffer_mem += data->len;
        cmd->gen.param.data_len = data->len;
        cmd->gen.param.data = data->data;
        cmd->out_uid = -2;
@@ -2621,6 +2628,16 @@ imap_commit_cmds( store_t *gctx )
        (void)gctx;
 }
 
+/******************* imap_memory_usage *******************/
+
+static int
+imap_memory_usage( store_t *gctx )
+{
+       imap_store_t *ctx = (imap_store_t *)gctx;
+
+       return ctx->buffer_mem + ctx->conn.buffer_mem;
+}
+
 /******************* imap_parse_store *******************/
 
 imap_server_conf_t *servers, **serverapp = &servers;
@@ -2899,4 +2916,5 @@ struct driver imap_driver = {
        imap_close_box,
        imap_cancel_cmds,
        imap_commit_cmds,
+       imap_memory_usage,
 };
diff --git a/src/drv_maildir.c b/src/drv_maildir.c
index 27073eb..29d4c2e 100644
--- a/src/drv_maildir.c
+++ b/src/drv_maildir.c
@@ -1606,6 +1606,12 @@ maildir_commit_cmds( store_t *gctx )
 }
 
 static int
+maildir_memory_usage( store_t *gctx ATTR_UNUSED )
+{
+       return 0;
+}
+
+static int
 maildir_parse_store( conffile_t *cfg, store_conf_t **storep )
 {
        maildir_store_conf_t *store;
@@ -1672,4 +1678,5 @@ struct driver maildir_driver = {
        maildir_close_box,
        maildir_cancel_cmds,
        maildir_commit_cmds,
+       maildir_memory_usage,
 };
diff --git a/src/main.c b/src/main.c
index beb6059..f587e2d 100644
--- a/src/main.c
+++ b/src/main.c
@@ -43,6 +43,8 @@ int Pid;              /* for maildir and imap */
 char Hostname[256];    /* for maildir */
 const char *Home;      /* for config */
 
+int BufferLimit = 10 * 1024 * 1024;
+
 static void
 version( void )
 {
diff --git a/src/mbsync.1 b/src/mbsync.1
index e5ad1fb..02e89ad 100644
--- a/src/mbsync.1
+++ b/src/mbsync.1
@@ -583,6 +583,14 @@ This option is meaningless for \fBSyncState\fR if the 
latter is \fB*\fR,
 obviously. However, it also determines the default of \fBInfoDelimiter\fR.
 (Global default: \fI;\fR on Windows, \fI:\fR everywhere else)
 ..
+.TP
+\fBBufferLimit\fR \fIsize\fR[\fBk\fR|\fBm\fR][\fBb\fR]
+The per-Channel, per-direction instantaneous memory usage above which
+\fBmbsync\fR will refrain from using more memory. Note that this is no
+absolute limit, as even a single message can consume more memory than
+this.
+(Default: \fI10M\fR)
+..
 .SH RECOMMENDATIONS
 Make sure your IMAP server does not auto-expunge deleted messages - it is
 slow, and semantically somewhat questionable. Specifically, Gmail needs to
diff --git a/src/socket.c b/src/socket.c
index b7de54b..5cde674 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -737,6 +737,7 @@ dispose_chunk( conn_t *conn )
        buff_chunk_t *bc = conn->write_buf;
        if (!(conn->write_buf = bc->next))
                conn->write_buf_append = &conn->write_buf;
+       conn->buffer_mem -= bc->len;
        free( bc );
 }
 
@@ -770,6 +771,7 @@ static void
 do_append( conn_t *conn, buff_chunk_t *bc )
 {
        bc->next = 0;
+       conn->buffer_mem += bc->len;
        *conn->write_buf_append = bc;
        conn->write_buf_append = &bc->next;
 }
diff --git a/src/socket.h b/src/socket.h
index a420e49..7ea6086 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -104,6 +104,7 @@ typedef struct {
        int append_avail; /* space left in accumulating buffer */
 #endif
        int write_offset; /* offset into buffer head */
+       int buffer_mem; /* memory currently occupied by buffers in the queue */
 
        /* reading */
        int offset; /* start of filled bytes in buffer */
diff --git a/src/sync.c b/src/sync.c
index 736f9ca..7635e6e 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -154,6 +154,7 @@ typedef struct {
        store_t *ctx[2];
        driver_t *drv[2];
        const char *orig_name[2];
+       message_t *new_msgs[2];
        int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
        int new_total[2], new_done[2];
        int flags_total[2], flags_done[2];
@@ -194,7 +195,7 @@ static int check_cancel( sync_vars_t *svars );
 
 #define ST_LOADED          (1<<0)
 #define ST_FIND_OLD        (1<<1)
-#define ST_SENT_NEW        (1<<2)
+#define ST_SENDING_NEW     (1<<2)
 #define ST_FIND_NEW        (1<<3)
 #define ST_FOUND_NEW       (1<<4)
 #define ST_SENT_FLAGS      (1<<5)
@@ -1336,7 +1337,6 @@ box_loaded( int sts, void *aux )
        sync_rec_t *srec;
        sync_rec_map_t *srecmap;
        message_t *tmsg;
-       copy_vars_t *cv;
        flag_vars_t *fv;
        int uid, no[2], del[2], alive, todel, t1, t2;
        int sflags, nflags, aflags, dflags, nex;
@@ -1724,21 +1724,7 @@ box_loaded( int sts, void *aux )
        for (t = 0; t < 2; t++) {
                svars->newuid[t] = svars->ctx[t]->uidnext;
                Fprintf( svars->jfp, "%c %d\n", "{}"[t], svars->newuid[t] );
-               for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) {
-                       if ((srec = tmsg->srec) && srec->tuid[0]) {
-                               svars->new_total[t]++;
-                               stats( svars );
-                               cv = nfmalloc( sizeof(*cv) );
-                               cv->cb = msg_copied;
-                               cv->aux = AUX;
-                               cv->srec = srec;
-                               cv->msg = tmsg;
-                               copy_msg( cv );
-                               if (check_cancel( svars ))
-                                       goto out;
-                       }
-               }
-               svars->state[t] |= ST_SENT_NEW;
+               svars->new_msgs[t] = svars->ctx[1-t]->msgs;
                msgs_copied( svars, t );
                if (check_cancel( svars ))
                        goto out;
@@ -1809,11 +1795,38 @@ static void sync_close( sync_vars_t *svars, int t );
 static void
 msgs_copied( sync_vars_t *svars, int t )
 {
-       if (!(svars->state[t] & ST_SENT_NEW) || svars->new_done[t] < 
svars->new_total[t])
+       message_t *tmsg;
+       sync_rec_t *srec;
+       copy_vars_t *cv;
+
+       if (svars->state[t] & ST_SENDING_NEW)
                return;
 
        sync_ref( svars );
 
+       for (tmsg = svars->new_msgs[t]; tmsg; tmsg = tmsg->next) {
+               if ((srec = tmsg->srec) && srec->tuid[0]) {
+                       if (svars->drv[t]->memory_usage( svars->ctx[t] ) >= 
BufferLimit)
+                               break;
+                       svars->new_total[t]++;
+                       stats( svars );
+                       svars->state[t] |= ST_SENDING_NEW;
+                       cv = nfmalloc( sizeof(*cv) );
+                       cv->cb = msg_copied;
+                       cv->aux = AUX;
+                       cv->srec = srec;
+                       cv->msg = tmsg;
+                       copy_msg( cv );
+                       svars->state[t] &= ~ST_SENDING_NEW;
+                       if (check_cancel( svars ))
+                               goto out;
+               }
+       }
+       svars->new_msgs[t] = tmsg;
+
+       if (tmsg || svars->new_done[t] < svars->new_total[t])
+               goto out;
+
        Fprintf( svars->jfp, "%c %d\n", ")("[t], svars->maxuid[1-t] );
        sync_close( svars, 1-t );
        if (check_cancel( svars ))
@@ -2002,7 +2015,7 @@ static void
 sync_close( sync_vars_t *svars, int t )
 {
        if ((~svars->state[t] & (ST_FOUND_NEW|ST_SENT_TRASH)) || 
svars->trash_done[t] < svars->trash_total[t] ||
-           !(svars->state[1-t] & ST_SENT_NEW) || svars->new_done[1-t] < 
svars->new_total[1-t])
+           svars->new_msgs[1-t] || svars->new_done[1-t] < 
svars->new_total[1-t])
                return;
 
        if (svars->state[t] & ST_CLOSING)

------------------------------------------------------------------------------
Dive into the World of Parallel Programming. The Go Parallel Website,
sponsored by Intel and developed in partnership with Slashdot Media, is your
hub for all things parallel software development, from weekly thought
leadership blogs to news, videos, case studies, tutorials and more. Take a
look and join the conversation now. http://goparallel.sourceforge.net/
_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to