Changeset: 020854aba345 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=020854aba345
Added Files:
        clients/mapilib/ChangeLog
        common/stream/ChangeLog
Modified Files:
        clients/Tests/exports.stable.out
        clients/mapilib/mapi.c
        clients/odbc/driver/SQLConnect.c
        clients/odbc/driver/SQLSetConnectAttr.c
        common/stream/stream.c
        common/stream/stream.h
        monetdb5/mal/mal_client.c
Branch: default
Log Message:

Implemented timeout callback function.

You can specify a read and write timeout on a stream (only implemented
(or even interesting) for socket streams) which causes the read/write
to get aborted after the specified timeout.  If no data has been
transferred, the function returns an error.  If you specify a callback
function, it is called when the timeout happens, and if the callback
function returns FALSE (0), the read/write is tried again.

The timeout value was changed from seconds to milliseconds.


diffs (truncated from 313 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -2984,7 +2984,7 @@ ssize_t mnstr_read_block(stream *s, void
 ssize_t mnstr_readline(stream *s, void *buf, size_t maxcnt);
 stream *mnstr_rstream(stream *s);
 void mnstr_set_byteorder(stream *s, char bigendian);
-void mnstr_settimeout(stream *s, unsigned int secs);
+void mnstr_settimeout(stream *s, unsigned int ms, int( *func)(void));
 int mnstr_type(stream *s);
 ssize_t mnstr_write(stream *s, const void *buf, size_t elmsize, size_t cnt);
 int mnstr_writeBte(stream *s, signed char val);
diff --git a/clients/mapilib/ChangeLog b/clients/mapilib/ChangeLog
new file mode 100644
--- /dev/null
+++ b/clients/mapilib/ChangeLog
@@ -0,0 +1,6 @@
+# ChangeLog file for mapilib
+# This file is updated with Maddlog
+
+* Fri Mar 28 2014 Sjoerd Mullender <[email protected]>
+- Changed mapi_timeout argument from seconds to milliseconds.
+
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2988,8 +2988,8 @@ mapi_timeout(Mapi mid, unsigned int time
        mapi_check(mid, "mapi_timeout");
        if (mid->trace == MAPI_TRACE)
                printf("Set timeout to %u\n", timeout);
-       mnstr_settimeout(mid->to, timeout);
-       mnstr_settimeout(mid->from, timeout);
+       mnstr_settimeout(mid->to, timeout, NULL);
+       mnstr_settimeout(mid->from, timeout, NULL);
        return MOK;
 }
 
diff --git a/clients/odbc/driver/SQLConnect.c b/clients/odbc/driver/SQLConnect.c
--- a/clients/odbc/driver/SQLConnect.c
+++ b/clients/odbc/driver/SQLConnect.c
@@ -267,7 +267,7 @@ SQLConnect_(ODBCDbc *dbc,
                    (dbc->major == 11 && dbc->minor >= 5))
                        mapi_set_size_header(mid, 1);
                /* set timeout after we're connected */
-               mapi_timeout(mid, dbc->sql_attr_connection_timeout);
+               mapi_timeout(mid, dbc->sql_attr_connection_timeout * 1000);
        }
 
        return rc;
diff --git a/clients/odbc/driver/SQLSetConnectAttr.c 
b/clients/odbc/driver/SQLSetConnectAttr.c
--- a/clients/odbc/driver/SQLSetConnectAttr.c
+++ b/clients/odbc/driver/SQLSetConnectAttr.c
@@ -99,7 +99,7 @@ SQLSetConnectAttr_(ODBCDbc *dbc,
        case SQL_ATTR_CONNECTION_TIMEOUT:
                dbc->sql_attr_connection_timeout = (SQLUINTEGER) (size_t) 
ValuePtr;
                if (dbc->mid)
-                       mapi_timeout(dbc->mid, 
dbc->sql_attr_connection_timeout);
+                       mapi_timeout(dbc->mid, dbc->sql_attr_connection_timeout 
* 1000);
                break;
        case SQL_ATTR_TXN_ISOLATION:
                /* nothing to change, we only do the highest level */
diff --git a/common/stream/ChangeLog b/common/stream/ChangeLog
new file mode 100644
--- /dev/null
+++ b/common/stream/ChangeLog
@@ -0,0 +1,10 @@
+# ChangeLog file for stream
+# This file is updated with Maddlog
+
+* Fri Mar 28 2014 Sjoerd Mullender <[email protected]>
+- Changed mnstr_settimeout function so that the specified timeout is now
+  in milliseconds (used to be seconds), and that it also needs an extra
+  argument specifying a callback function (no arguments, int result)
+  that should return TRUE if the timeout should cause the function to
+  abort or continue what it was doing.
+
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -141,7 +141,8 @@ struct stream {
        char isutf8;            /* known to be UTF-8 due to BOM */
        short type;             /* ascii/binary */
        char *name;
-       unsigned int timeout;
+       unsigned int timeout;      /* timeout in ms */
+       int (*timeout_func)(void); /* callback function: NULL/true -> return */
        union {
                void *p;
                int i;
@@ -159,11 +160,6 @@ struct stream {
        int (*fgetpos) (stream *s, lng *p);
        int (*fsetpos) (stream *s, lng p);
        void (*update_timeout) (stream *s);
-       /* in case read() read a non-integral number of elements we
-        * save the last partial element here (only used in
-        * socket_read() */
-       void *buf;
-       size_t len;
 };
 
 int
@@ -276,9 +272,10 @@ mnstr_write(stream *s, const void *buf, 
 }
 
 void
-mnstr_settimeout(stream *s, unsigned int secs)
+mnstr_settimeout(stream *s, unsigned int ms, int (*func)(void))
 {
-       s->timeout = secs;
+       s->timeout = ms;
+       s->timeout_func = func;
        if (s->update_timeout)
                (*s->update_timeout)(s);
 }
@@ -481,8 +478,6 @@ get_extention(const char *file)
 static void
 destroy(stream *s)
 {
-       if (s->buf)
-               free(s->buf);
        free(s->name);
        free(s);
 }
@@ -535,9 +530,8 @@ create_stream(const char *name)
        s->fgetpos = NULL;
        s->fsetpos = NULL;
        s->timeout = 0;
+       s->timeout_func = NULL;
        s->update_timeout = NULL;
-       s->buf = NULL;
-       s->len = 0;
 #ifdef STREAM_DEBUG
        printf("create_stream %s -> " PTRFMT "\n", name ? name : "<unnamed>", 
PTRFMTCAST s);
 #endif
@@ -1510,9 +1504,12 @@ socket_write(stream *s, const void *buf,
                ((nr = write(s->stream_data.s, ((const char *) buf + res),
                             size - res)) > 0)
 #endif
-               || (s->timeout == 0
-                   && (errno == EAGAIN || errno == EWOULDBLOCK))
-               || errno == EINTR)
+               || (nr < 0 &&   /* syscall failed */
+                   s->timeout > 0 && /* potentially timeout */
+                   (errno == EAGAIN || errno == EWOULDBLOCK) && /* it was! */
+                   s->timeout_func != NULL && /* callback function exists */
+                   !(*s->timeout_func)())     /* callback says don't stop */
+               || (nr < 0 && errno == EINTR)) /* interrupted */
                ) {
                errno = 0;
                if (nr > 0)
@@ -1538,20 +1535,19 @@ socket_read(stream *s, void *buf, size_t
        if (!s || s->errnr || size == 0)
                return -1;
 
-       assert((s->buf == NULL) == (s->len == 0));
-       if (s->buf) {
-               assert((size_t) size > s->len);
-               memcpy(buf, s->buf, s->len);
-       }
-
+       do {
+               errno = 0;
 #ifdef NATIVE_WIN32
-       if (size > INT_MAX)
-               size = elmsize * (INT_MAX / elmsize);
-       nr = recv(s->stream_data.s, (char *) buf + s->len, (int) (size - 
s->len), 0);
+               if (size > INT_MAX)
+                       size = elmsize * (INT_MAX / elmsize);
+               nr = recv(s->stream_data.s, buf, (int) size, 0);
 #else
-       nr = read(s->stream_data.s, (char *) buf + s->len, size - s->len);
+               nr = read(s->stream_data.s, buf, size);
 #endif
-       if (nr == -1) {
+       } while (nr == -1 && s->timeout > 0 &&
+                (errno == EAGAIN || errno == EWOULDBLOCK) &&
+                s->timeout_func && !(*s->timeout_func)());
+       if (nr < 0) {
                if (s->timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
                        s->errnr = MNSTR_TIMEOUT;
                else
@@ -1560,17 +1556,30 @@ socket_read(stream *s, void *buf, size_t
        }
        if (nr == 0)
                return 0;       /* end of file */
-       if (s->buf) {
-               nr += s->len;
-               free(s->buf);
-               s->buf = NULL;
-               s->len = 0;
-       }
-       if (elmsize > 1 && (cnt = nr % elmsize) != 0) {
-               s->buf = malloc(cnt);
-               memcpy(s->buf, (char *) buf + nr - cnt, cnt);
-               s->len = cnt;
-               nr -= cnt;
+       while (elmsize > 1 && nr % elmsize != 0) {
+               /* if elmsize > 1, we really expect that "the other
+                * side" wrote complete items in a single system call,
+                * so we expect to at least receive complete items,
+                * and hence we continue reading until we did in fact
+                * receive an integral number of complete items,
+                * ignoring any timeouts (but not real errors)
+                * (note that recursion is limited since we don't
+                * propagate the element size to the recursive
+                * call) */
+               ssize_t n;
+               n = socket_read(s, (char *) buf + nr, 1, (size_t) (size - nr));
+               if (n < 0) {
+                       if (s->errnr == MNSTR_TIMEOUT) {
+                               /* ignore timeout */
+                               s->errnr = MNSTR_NO__ERROR;
+                               continue;
+                       }
+                       /* some other read error is serious */
+                       return -1;
+               }
+               if (n == 0)     /* unexpected end of file */
+                       break;
+               nr += n;
        }
        return (ssize_t) (nr / elmsize);
 }
@@ -1598,10 +1607,6 @@ socket_close(stream *s)
                }
        }
        s->stream_data.s = INVALID_SOCKET;
-       if (s->buf)
-               free(s->buf);
-       s->buf = NULL;
-       s->len = 0;
 }
 
 static void
@@ -1610,11 +1615,13 @@ socket_update_timeout(stream *s)
        SOCKET fd = s->stream_data.s;
        struct timeval tv;
 
-       tv.tv_sec = s->timeout;
-       tv.tv_usec = 0;
+       tv.tv_sec = s->timeout / 1000;
+       tv.tv_usec = (s->timeout % 1000) * 1000;
        /* cast to char * for Windows, no harm on "normal" systems */
-       setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, (socklen_t) 
sizeof(tv));
-       setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, (socklen_t) 
sizeof(tv));
+       if (s->access == ST_READ)
+               setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, 
(socklen_t) sizeof(tv));
+       if (s->access == ST_WRITE)
+               setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, 
(socklen_t) sizeof(tv));
 }
 
 static stream *
@@ -2222,6 +2229,7 @@ ic_update_timeout(stream *s)
 
        if (ic && ic->s) {
                ic->s->timeout = s->timeout;
+               ic->s->timeout_func = s->timeout_func;
                if (ic->s->update_timeout)
                        (*ic->s->update_timeout)(ic->s);
        }
@@ -2764,6 +2772,7 @@ bs_update_timeout(stream *ss)
        bs *s = ss->stream_data.p;
        if (s && s->s) {
                s->s->timeout = ss->timeout;
+               s->s->timeout_func = ss->timeout_func;
                if (s->s->update_timeout)
                        (*s->s->update_timeout)(s->s);
        }
@@ -3280,6 +3289,7 @@ wbs_update_timeout(stream *s)
        wbs_stream *wbs = (wbs_stream *) s->stream_data.p;
        if (wbs && wbs->s) {
                wbs->s->timeout = s->timeout;
+               wbs->s->timeout_func = s->timeout_func;
                if (wbs->s->update_timeout)
                        (*wbs->s->update_timeout)(wbs->s);
        }
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -132,7 +132,7 @@ stream_export int mnstr_byteorder(stream
 stream_export void mnstr_set_byteorder(stream *s, char bigendian);
 stream_export stream *mnstr_rstream(stream *s);
 stream_export stream *mnstr_wstream(stream *s);
-stream_export void mnstr_settimeout(stream *s, unsigned int secs);
+stream_export void mnstr_settimeout(stream *s, unsigned int ms, int 
(*func)(void));
 
 stream_export stream *open_rstream(const char *filename);
 stream_export stream *open_wstream(const char *filename);
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -526,20 +526,7 @@ MCreadClient(Client c)
                        mnstr_flush(c->fdout);
                        in->eof = 0;
                }
-               for (;;) {
-                       rd = bstream_next(in);
-                       if (GDKexiting())
-                               return 0;
-                       if (rd < 0) {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to