Changeset: 020854aba345 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=020854aba345 Added Files: clients/mapilib/ChangeLog common/stream/ChangeLog Modified Files: clients/Tests/exports.stable.out clients/mapilib/mapi.c clients/odbc/driver/SQLConnect.c clients/odbc/driver/SQLSetConnectAttr.c common/stream/stream.c common/stream/stream.h monetdb5/mal/mal_client.c Branch: default Log Message:
Implemented timeout callback function. You can specify a read and write timeout on a stream (only implemented (or even interesting) for socket streams) which causes the read/write to get aborted after the specified timeout. If no data has been transferred, the function returns an error. If you specify a callback function, it is called when the timeout happens, and if the callback function returns FALSE (0), the read/write is tried again. The timeout value was changed from seconds to milliseconds. diffs (truncated from 313 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -2984,7 +2984,7 @@ ssize_t mnstr_read_block(stream *s, void ssize_t mnstr_readline(stream *s, void *buf, size_t maxcnt); stream *mnstr_rstream(stream *s); void mnstr_set_byteorder(stream *s, char bigendian); -void mnstr_settimeout(stream *s, unsigned int secs); +void mnstr_settimeout(stream *s, unsigned int ms, int( *func)(void)); int mnstr_type(stream *s); ssize_t mnstr_write(stream *s, const void *buf, size_t elmsize, size_t cnt); int mnstr_writeBte(stream *s, signed char val); diff --git a/clients/mapilib/ChangeLog b/clients/mapilib/ChangeLog new file mode 100644 --- /dev/null +++ b/clients/mapilib/ChangeLog @@ -0,0 +1,6 @@ +# ChangeLog file for mapilib +# This file is updated with Maddlog + +* Fri Mar 28 2014 Sjoerd Mullender <[email protected]> +- Changed mapi_timeout argument from seconds to milliseconds. + diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2988,8 +2988,8 @@ mapi_timeout(Mapi mid, unsigned int time mapi_check(mid, "mapi_timeout"); if (mid->trace == MAPI_TRACE) printf("Set timeout to %u\n", timeout); - mnstr_settimeout(mid->to, timeout); - mnstr_settimeout(mid->from, timeout); + mnstr_settimeout(mid->to, timeout, NULL); + mnstr_settimeout(mid->from, timeout, NULL); return MOK; } diff --git a/clients/odbc/driver/SQLConnect.c b/clients/odbc/driver/SQLConnect.c --- a/clients/odbc/driver/SQLConnect.c +++ b/clients/odbc/driver/SQLConnect.c @@ -267,7 +267,7 @@ SQLConnect_(ODBCDbc *dbc, (dbc->major == 11 && dbc->minor >= 5)) mapi_set_size_header(mid, 1); /* set timeout after we're connected */ - mapi_timeout(mid, dbc->sql_attr_connection_timeout); + mapi_timeout(mid, dbc->sql_attr_connection_timeout * 1000); } return rc; diff --git a/clients/odbc/driver/SQLSetConnectAttr.c b/clients/odbc/driver/SQLSetConnectAttr.c --- a/clients/odbc/driver/SQLSetConnectAttr.c +++ b/clients/odbc/driver/SQLSetConnectAttr.c @@ -99,7 +99,7 @@ SQLSetConnectAttr_(ODBCDbc *dbc, case SQL_ATTR_CONNECTION_TIMEOUT: dbc->sql_attr_connection_timeout = (SQLUINTEGER) (size_t) ValuePtr; if (dbc->mid) - mapi_timeout(dbc->mid, dbc->sql_attr_connection_timeout); + mapi_timeout(dbc->mid, dbc->sql_attr_connection_timeout * 1000); break; case SQL_ATTR_TXN_ISOLATION: /* nothing to change, we only do the highest level */ diff --git a/common/stream/ChangeLog b/common/stream/ChangeLog new file mode 100644 --- /dev/null +++ b/common/stream/ChangeLog @@ -0,0 +1,10 @@ +# ChangeLog file for stream +# This file is updated with Maddlog + +* Fri Mar 28 2014 Sjoerd Mullender <[email protected]> +- Changed mnstr_settimeout function so that the specified timeout is now + in milliseconds (used to be seconds), and that it also needs an extra + argument specifying a callback function (no arguments, int result) + that should return TRUE if the timeout should cause the function to + abort or continue what it was doing. + diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -141,7 +141,8 @@ struct stream { char isutf8; /* known to be UTF-8 due to BOM */ short type; /* ascii/binary */ char *name; - unsigned int timeout; + unsigned int timeout; /* timeout in ms */ + int (*timeout_func)(void); /* callback function: NULL/true -> return */ union { void *p; int i; @@ -159,11 +160,6 @@ struct stream { int (*fgetpos) (stream *s, lng *p); int (*fsetpos) (stream *s, lng p); void (*update_timeout) (stream *s); - /* in case read() read a non-integral number of elements we - * save the last partial element here (only used in - * socket_read() */ - void *buf; - size_t len; }; int @@ -276,9 +272,10 @@ mnstr_write(stream *s, const void *buf, } void -mnstr_settimeout(stream *s, unsigned int secs) +mnstr_settimeout(stream *s, unsigned int ms, int (*func)(void)) { - s->timeout = secs; + s->timeout = ms; + s->timeout_func = func; if (s->update_timeout) (*s->update_timeout)(s); } @@ -481,8 +478,6 @@ get_extention(const char *file) static void destroy(stream *s) { - if (s->buf) - free(s->buf); free(s->name); free(s); } @@ -535,9 +530,8 @@ create_stream(const char *name) s->fgetpos = NULL; s->fsetpos = NULL; s->timeout = 0; + s->timeout_func = NULL; s->update_timeout = NULL; - s->buf = NULL; - s->len = 0; #ifdef STREAM_DEBUG printf("create_stream %s -> " PTRFMT "\n", name ? name : "<unnamed>", PTRFMTCAST s); #endif @@ -1510,9 +1504,12 @@ socket_write(stream *s, const void *buf, ((nr = write(s->stream_data.s, ((const char *) buf + res), size - res)) > 0) #endif - || (s->timeout == 0 - && (errno == EAGAIN || errno == EWOULDBLOCK)) - || errno == EINTR) + || (nr < 0 && /* syscall failed */ + s->timeout > 0 && /* potentially timeout */ + (errno == EAGAIN || errno == EWOULDBLOCK) && /* it was! */ + s->timeout_func != NULL && /* callback function exists */ + !(*s->timeout_func)()) /* callback says don't stop */ + || (nr < 0 && errno == EINTR)) /* interrupted */ ) { errno = 0; if (nr > 0) @@ -1538,20 +1535,19 @@ socket_read(stream *s, void *buf, size_t if (!s || s->errnr || size == 0) return -1; - assert((s->buf == NULL) == (s->len == 0)); - if (s->buf) { - assert((size_t) size > s->len); - memcpy(buf, s->buf, s->len); - } - + do { + errno = 0; #ifdef NATIVE_WIN32 - if (size > INT_MAX) - size = elmsize * (INT_MAX / elmsize); - nr = recv(s->stream_data.s, (char *) buf + s->len, (int) (size - s->len), 0); + if (size > INT_MAX) + size = elmsize * (INT_MAX / elmsize); + nr = recv(s->stream_data.s, buf, (int) size, 0); #else - nr = read(s->stream_data.s, (char *) buf + s->len, size - s->len); + nr = read(s->stream_data.s, buf, size); #endif - if (nr == -1) { + } while (nr == -1 && s->timeout > 0 && + (errno == EAGAIN || errno == EWOULDBLOCK) && + s->timeout_func && !(*s->timeout_func)()); + if (nr < 0) { if (s->timeout > 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) s->errnr = MNSTR_TIMEOUT; else @@ -1560,17 +1556,30 @@ socket_read(stream *s, void *buf, size_t } if (nr == 0) return 0; /* end of file */ - if (s->buf) { - nr += s->len; - free(s->buf); - s->buf = NULL; - s->len = 0; - } - if (elmsize > 1 && (cnt = nr % elmsize) != 0) { - s->buf = malloc(cnt); - memcpy(s->buf, (char *) buf + nr - cnt, cnt); - s->len = cnt; - nr -= cnt; + while (elmsize > 1 && nr % elmsize != 0) { + /* if elmsize > 1, we really expect that "the other + * side" wrote complete items in a single system call, + * so we expect to at least receive complete items, + * and hence we continue reading until we did in fact + * receive an integral number of complete items, + * ignoring any timeouts (but not real errors) + * (note that recursion is limited since we don't + * propagate the element size to the recursive + * call) */ + ssize_t n; + n = socket_read(s, (char *) buf + nr, 1, (size_t) (size - nr)); + if (n < 0) { + if (s->errnr == MNSTR_TIMEOUT) { + /* ignore timeout */ + s->errnr = MNSTR_NO__ERROR; + continue; + } + /* some other read error is serious */ + return -1; + } + if (n == 0) /* unexpected end of file */ + break; + nr += n; } return (ssize_t) (nr / elmsize); } @@ -1598,10 +1607,6 @@ socket_close(stream *s) } } s->stream_data.s = INVALID_SOCKET; - if (s->buf) - free(s->buf); - s->buf = NULL; - s->len = 0; } static void @@ -1610,11 +1615,13 @@ socket_update_timeout(stream *s) SOCKET fd = s->stream_data.s; struct timeval tv; - tv.tv_sec = s->timeout; - tv.tv_usec = 0; + tv.tv_sec = s->timeout / 1000; + tv.tv_usec = (s->timeout % 1000) * 1000; /* cast to char * for Windows, no harm on "normal" systems */ - setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, (socklen_t) sizeof(tv)); - setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, (socklen_t) sizeof(tv)); + if (s->access == ST_READ) + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, (socklen_t) sizeof(tv)); + if (s->access == ST_WRITE) + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, (socklen_t) sizeof(tv)); } static stream * @@ -2222,6 +2229,7 @@ ic_update_timeout(stream *s) if (ic && ic->s) { ic->s->timeout = s->timeout; + ic->s->timeout_func = s->timeout_func; if (ic->s->update_timeout) (*ic->s->update_timeout)(ic->s); } @@ -2764,6 +2772,7 @@ bs_update_timeout(stream *ss) bs *s = ss->stream_data.p; if (s && s->s) { s->s->timeout = ss->timeout; + s->s->timeout_func = ss->timeout_func; if (s->s->update_timeout) (*s->s->update_timeout)(s->s); } @@ -3280,6 +3289,7 @@ wbs_update_timeout(stream *s) wbs_stream *wbs = (wbs_stream *) s->stream_data.p; if (wbs && wbs->s) { wbs->s->timeout = s->timeout; + wbs->s->timeout_func = s->timeout_func; if (wbs->s->update_timeout) (*wbs->s->update_timeout)(wbs->s); } diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -132,7 +132,7 @@ stream_export int mnstr_byteorder(stream stream_export void mnstr_set_byteorder(stream *s, char bigendian); stream_export stream *mnstr_rstream(stream *s); stream_export stream *mnstr_wstream(stream *s); -stream_export void mnstr_settimeout(stream *s, unsigned int secs); +stream_export void mnstr_settimeout(stream *s, unsigned int ms, int (*func)(void)); stream_export stream *open_rstream(const char *filename); stream_export stream *open_wstream(const char *filename); diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -526,20 +526,7 @@ MCreadClient(Client c) mnstr_flush(c->fdout); in->eof = 0; } - for (;;) { - rd = bstream_next(in); - if (GDKexiting()) - return 0; - if (rd < 0) { _______________________________________________ checkin-list mailing list [email protected] https://www.monetdb.org/mailman/listinfo/checkin-list
