commit 6c08f568d0653fb35826b5d1c7cace56d2f02c5a
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Sat May 9 19:17:41 2015 +0200

    fix socket_write() recursion
    
    the synchronous writing to the socket would have typically invoked the
    write callback, which would flush further commands, thus recursing.
    
    we take the easy way out and make it fully asynchronous, i.e., no data
    is sent before (re-)entering the event loop.
    
    this also has the effect that socket_write() cannot fail any more, and
    any errors will be reported asynchronously. this is consistent with
    socket_read(), and produces cleaner code.
    
    this introduces a marginal performance regression: the maildir driver is
    synchronous, so all messages (which fit into memory) will be read before
    any data is sent. this is not considered relevant.

 src/drv_imap.c |   90 +++++++++++++++++++++---------------------------
 src/socket.c   |   30 +++++++---------
 src/socket.h   |    4 ++-
 3 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/src/drv_imap.c b/src/drv_imap.c
index f8618d1..6a1394d 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -266,7 +266,7 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int 
response )
        free( cmd );
 }
 
-static int
+static void
 send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
 {
        int bufl, litplus, iovcnt = 1;
@@ -312,19 +312,13 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
                iov[2].takeOwn = KeepOwn;
                iovcnt = 3;
        }
-       if (socket_write( &ctx->conn, iov, iovcnt ) < 0)
-               goto bail;
+       socket_write( &ctx->conn, iov, iovcnt );
        if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
                ctx->trashnc = TrashChecking;
        cmd->next = 0;
        *ctx->in_progress_append = cmd;
        ctx->in_progress_append = &cmd->next;
        ctx->num_in_progress++;
-       return 0;
-
-  bail:
-       done_imap_cmd( ctx, cmd, RESP_CANCEL );
-       return -1;
 }
 
 static int
@@ -346,11 +340,10 @@ flush_imap_cmds( imap_store_t *ctx )
 {
        struct imap_cmd *cmd;
 
-       while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
+       if ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
                if (!(ctx->pending = cmd->next))
                        ctx->pending_append = &ctx->pending;
-               if (send_imap_cmd( ctx, cmd ) < 0)
-                       return -1;
+               send_imap_cmd( ctx, cmd );
        }
        return 0;
 }
@@ -379,7 +372,7 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
        }
 }
 
-static int
+static void
 submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
 {
        assert( ctx );
@@ -396,10 +389,9 @@ submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
                        *ctx->pending_append = cmd;
                        ctx->pending_append = &cmd->next;
                }
-               return 0;
+       } else {
+               send_imap_cmd( ctx, cmd );
        }
-
-       return send_imap_cmd( ctx, cmd );
 }
 
 /* Minimal printf() replacement that supports an %\s format sequence to print 
backslash-escaped
@@ -484,7 +476,7 @@ imap_vprintf( const char *fmt, va_list ap )
        }
 }
 
-static int
+static void
 imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
            void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response 
),
            const char *fmt, ... )
@@ -497,7 +489,7 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
        va_start( ap, fmt );
        cmdp->cmd = imap_vprintf( fmt, ap );
        va_end( ap );
-       return submit_imap_cmd( ctx, cmdp );
+       submit_imap_cmd( ctx, cmdp );
 }
 
 static void
@@ -1332,8 +1324,7 @@ imap_socket_read( void *aux )
                                iov[1].buf = "\r\n";
                                iov[1].len = 2;
                                iov[1].takeOwn = KeepOwn;
-                               if (socket_write( &ctx->conn, iov, 2 ) < 0)
-                                       return;
+                               socket_write( &ctx->conn, iov, 2 );
                        } else if (cmdp->param.cont) {
                                if (cmdp->param.cont( ctx, cmdp, cmd ))
                                        return;
@@ -1369,9 +1360,8 @@ imap_socket_read( void *aux )
                                                cmd2->orig_cmd = cmdp;
                                                cmd2->gen.param.high_prio = 1;
                                                p = strchr( cmdp->cmd, '"' );
-                                               if (imap_exec( ctx, &cmd2->gen, 
get_cmd_result_p2,
-                                                              "CREATE %.*s", 
imap_strchr( p + 1, '"' ) - p + 1, p ) < 0)
-                                                       return;
+                                               imap_exec( ctx, &cmd2->gen, 
get_cmd_result_p2,
+                                                          "CREATE %.*s", 
imap_strchr( p + 1, '"' ) - p + 1, p );
                                                continue;
                                        }
                                        resp = RESP_NO;
@@ -1402,8 +1392,7 @@ imap_socket_read( void *aux )
                                return;
                        }
                }
-               if (flush_imap_cmds( ctx ) < 0)
-                       return;
+               flush_imap_cmds( ctx );
        }
        imap_invoke_bad_callback( ctx );
 }
@@ -1921,7 +1910,8 @@ do_sasl_auth( imap_store_t *ctx, struct imap_cmd *cmdp 
ATTR_UNUSED, const char *
        iov[iovcnt].len = 2;
        iov[iovcnt].takeOwn = KeepOwn;
        iovcnt++;
-       return socket_write( &ctx->conn, iov, iovcnt );
+       socket_write( &ctx->conn, iov, iovcnt );
+       return 0;
 
   bail:
        imap_open_store_bail( ctx, FAIL_FINAL );
@@ -2281,7 +2271,7 @@ imap_prepare_load_box( store_t *gctx, int opts )
        gctx->opts = opts;
 }
 
-static int imap_submit_load( imap_store_t *, const char *, int, struct 
imap_cmd_refcounted_state * );
+static void imap_submit_load( imap_store_t *, const char *, int, struct 
imap_cmd_refcounted_state * );
 
 static void
 imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, 
int nexcs,
@@ -2308,16 +2298,14 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, 
int newuid, int *excs, int
                                if (i != j)
                                        bl += sprintf( buf + bl, ":%d", excs[i] 
);
                        }
-                       if (imap_submit_load( ctx, buf, 0, sts ) < 0)
-                               goto done;
+                       imap_submit_load( ctx, buf, 0, sts );
                }
                if (maxuid == INT_MAX)
                        maxuid = ctx->gen.uidnext ? ctx->gen.uidnext - 1 : 
1000000000;
                if (maxuid >= minuid) {
                        if ((ctx->gen.opts & OPEN_FIND) && minuid < newuid) {
                                sprintf( buf, "%d:%d", minuid, newuid - 1 );
-                               if (imap_submit_load( ctx, buf, 0, sts ) < 0)
-                                       goto done;
+                               imap_submit_load( ctx, buf, 0, sts );
                                if (newuid > maxuid)
                                        goto done;
                                sprintf( buf, "%d:%d", newuid, maxuid );
@@ -2332,14 +2320,14 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, 
int newuid, int *excs, int
        }
 }
 
-static int
+static void
 imap_submit_load( imap_store_t *ctx, const char *buf, int tuids, struct 
imap_cmd_refcounted_state *sts )
 {
-       return imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
-                         "UID FETCH %s (UID%s%s%s)", buf,
-                         (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
-                         (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "",
-                         tuids ? " BODY.PEEK[HEADER.FIELDS (X-TUID)]" : "");
+       imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
+                  "UID FETCH %s (UID%s%s%s)", buf,
+                  (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
+                  (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "",
+                  tuids ? " BODY.PEEK[HEADER.FIELDS (X-TUID)]" : "");
 }
 
 /******************* imap_fetch_msg *******************/
@@ -2396,15 +2384,15 @@ imap_make_flags( int flags, char *buf )
        return d;
 }
 
-static int
+static void
 imap_flags_helper( imap_store_t *ctx, int uid, char what, int flags,
                    struct imap_cmd_refcounted_state *sts )
 {
        char buf[256];
 
        buf[imap_make_flags( flags, buf )] = 0;
-       return imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_set_flags_p2,
-                         "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
+       imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
+                  "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
 }
 
 static void
@@ -2422,8 +2410,10 @@ imap_set_msg_flags( store_t *gctx, message_t *msg, int 
uid, int add, int del,
        }
        if (add || del) {
                struct imap_cmd_refcounted_state *sts = 
imap_refcounted_new_state( cb, aux );
-               if ((add && imap_flags_helper( ctx, uid, '+', add, sts ) < 0) ||
-                   (del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0)) 
{}
+               if (add)
+                       imap_flags_helper( ctx, uid, '+', add, sts );
+               if (del)
+                       imap_flags_helper( ctx, uid, '-', del, sts );
                imap_refcounted_done( sts );
        } else {
                cb( DRV_OK, aux );
@@ -2474,9 +2464,8 @@ imap_close_box( store_t *gctx,
                        }
                        if (!bl)
                                break;
-                       if (imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
-                                      "UID EXPUNGE %s", buf ) < 0)
-                               break;
+                       imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
+                                  "UID EXPUNGE %s", buf );
                }
                imap_refcounted_done( sts );
        } else {
@@ -2617,13 +2606,12 @@ imap_list_store( store_t *gctx, int flags,
        imap_store_t *ctx = (imap_store_t *)gctx;
        struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, 
aux );
 
-       if (((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, 
ctx->prefix, -1 )) &&
-            imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
-                       "LIST \"\" \"%\\s*\"", ctx->prefix ) < 0) ||
-           ((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix) &&
-            imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
-                       "LIST \"\" INBOX*" ) < 0))
-               {}
+       if ((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, 
ctx->prefix, -1 )))
+               imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
+                          "LIST \"\" \"%\\s*\"", ctx->prefix );
+       if ((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix))
+               imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_refcounted_done_box,
+                          "LIST \"\" INBOX*" );
        imap_refcounted_done( sts );
 }
 
diff --git a/src/socket.c b/src/socket.c
index 5cde674..8a3b33b 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -755,6 +755,7 @@ do_queued_write( conn_t *conn )
                        return -1;
                if (n != len) {
                        conn->write_offset += n;
+                       conn->writing = 1;
                        return 0;
                }
                conn->write_offset = 0;
@@ -764,6 +765,7 @@ do_queued_write( conn_t *conn )
        if (conn->ssl && SSL_pending( conn->ssl ))
                conf_wakeup( &conn->ssl_fake, 0 );
 #endif
+       conn->writing = 0;
        return conn->write_callback( conn->callback_aux );
 }
 
@@ -787,6 +789,8 @@ do_flush( conn_t *conn )
 #ifdef HAVE_LIBZ
        if (conn->out_z) {
                int buf_avail = conn->append_avail;
+               if (!conn->z_written)
+                       return;
                do {
                        if (!bc) {
                                buf_avail = WRITE_CHUNK_SIZE;
@@ -812,6 +816,7 @@ do_flush( conn_t *conn )
                } while (!conn->out_z->avail_out);
                conn->append_buf = bc;
                conn->append_avail = buf_avail;
+               conn->z_written = 0;
        } else
 #endif
        if (bc) {
@@ -823,15 +828,15 @@ do_flush( conn_t *conn )
        }
 }
 
-int
+void
 socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
 {
        int i, buf_avail, len, offset = 0, total = 0;
-       buff_chunk_t *bc, *exwb = conn->write_buf;
+       buff_chunk_t *bc;
 
        for (i = 0; i < iovcnt; i++)
                total += iov[i].len;
-       if (total >= WRITE_CHUNK_SIZE && pending_wakeup( &conn->fd_fake )) {
+       if (total >= WRITE_CHUNK_SIZE) {
                /* If the new data is too big, queue the pending buffer to 
avoid latency. */
                do_flush( conn );
        }
@@ -870,6 +875,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
                                bc->len = (char *)conn->out_z->next_out - 
bc->data;
                                buf_avail = conn->out_z->avail_out;
                                len -= conn->out_z->avail_in;
+                               conn->z_written = 1;
                        } else
 #endif
                        {
@@ -898,17 +904,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
 #ifdef HAVE_LIBZ
        conn->append_avail = buf_avail;
 #endif
-       /* Queue the pending write once the main loop goes idle. */
-       conf_wakeup( &conn->fd_fake,
-#ifdef HAVE_LIBZ
-                    /* Always give zlib a chance to flush its internal buffer. 
*/
-                    conn->out_z ||
-#endif
-                    bc ? 0 : -1 );
-       /* If no writes were queued before, ensure that flushing commences. */
-       if (!exwb)
-               return do_queued_write( conn );
-       return 0;
+       conf_wakeup( &conn->fd_fake, 0 );
 }
 
 static void
@@ -963,10 +959,10 @@ socket_fake_cb( void *aux )
 {
        conn_t *conn = (conn_t *)aux;
 
-       buff_chunk_t *exwb = conn->write_buf;
+       /* Ensure that a pending write gets queued. */
        do_flush( conn );
-       /* If no writes were queued before, ensure that flushing commences. */
-       if (!exwb)
+       /* If no writes are ongoing, start writing now. */
+       if (!conn->writing)
                do_queued_write( conn );
 }
 
diff --git a/src/socket.h b/src/socket.h
index 7ea6086..dac2576 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -83,6 +83,7 @@ typedef struct {
 #ifdef HAVE_LIBZ
        z_streamp in_z, out_z;
        wakeup_t z_fake;
+       int z_written;
 #endif
 
        void (*bad_callback)( void *aux ); /* async fail while sending or 
listening */
@@ -100,6 +101,7 @@ typedef struct {
        /* writing */
        buff_chunk_t *append_buf; /* accumulating buffer */
        buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+       int writing;
 #ifdef HAVE_LIBZ
        int append_avail; /* space left in accumulating buffer */
 #endif
@@ -145,6 +147,6 @@ typedef struct conn_iovec {
        int len;
        ownership_t takeOwn;
 } conn_iovec_t;
-int socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
+void socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
 
 #endif

------------------------------------------------------------------------------
One dashboard for servers and applications across Physical-Virtual-Cloud 
Widest out-of-the-box monitoring support with 50+ applications
Performance metrics, stats and reports that give you Actionable Insights
Deep dive visibility with transaction tracing using APM Insight.
http://ad.doubleclick.net/ddm/clk/290420510;117567292;y
_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to