Changeset: c3c4203703da for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/c3c4203703da Modified Files: clients/Tests/exports.stable.out gdk/gdk_bat.c gdk/gdk_bbp.c gdk/gdk_select.c monetdb5/modules/mal/orderidx.c monetdb5/modules/mal/projectionpath.c monetdb5/optimizer/opt_dataflow.c monetdb5/optimizer/opt_reorder.c sql/backends/monet5/sql.c tools/monetdbe/monetdbe.c Branch: default Log Message:
Merge with Jul2021 branch. diffs (truncated from 1803 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1452,6 +1452,7 @@ stream *lz4_stream(stream *inner, int pr void mnstr_clearerr(stream *s); void mnstr_close(stream *s); void mnstr_destroy(stream *s); +bool mnstr_eof(const stream *s); mnstr_error_kind mnstr_errnr(const stream *s); char *mnstr_error(const stream *s); const char *mnstr_error_kind_name(mnstr_error_kind k); diff --git a/clients/mapilib/ChangeLog.Jul2021 b/clients/mapilib/ChangeLog.Jul2021 --- a/clients/mapilib/ChangeLog.Jul2021 +++ b/clients/mapilib/ChangeLog.Jul2021 @@ -1,3 +1,9 @@ # ChangeLog file for mapilib # This file is updated with Maddlog +* Wed Jul 21 2021 Joeri van Ruth <[email protected]> +- Add optional MAPI header field which can be used to immediately + set reply size, autocommit, time zone and some other options, see + mapi.h. This makes client connection setup faster. Support has been + added to mapilib, pymonetdb and the jdbc driver. + diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -3707,6 +3707,8 @@ read_line(Mapi mid) printf("text:%s\n", mid->blk.buf + mid->blk.end); } if (len == 0) { /* add prompt */ + if (mnstr_eof(mid->from)) + return NULL; if (mid->blk.end > mid->blk.nxt) { /* add fake newline since newline was * missing from server */ @@ -4379,8 +4381,15 @@ read_into_cache(MapiHdl hdl, int lookahe result = hdl->result; /* may also be NULL */ for (;;) { line = read_line(mid); - if (line == NULL) + if (line == NULL) { + if (mnstr_eof(mid->from)) { + mapi_log_record(mid, "unexpected end of file"); + mapi_log_record(mid, __func__); + close_connection(mid); + return mapi_setError(mid, "unexpected end of file", __func__, MERROR); + } return mid->error; + } switch (*line) { case PROMPTBEG: /* \001 */ mid->active = NULL; diff --git a/common/stream/bs.c b/common/stream/bs.c --- a/common/stream/bs.c +++ b/common/stream/bs.c @@ -169,7 +169,7 @@ ssize_t bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) { ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt); - if (ret != 0) + if (ret != 0 || ss->eof) return ret; bs *b = (bs*) ss-> stream_data.p; @@ -222,6 +222,7 @@ bs_read_internal(stream *restrict ss, vo mnstr_copy_error(ss, ss->inner); return -1; case 0: + ss->eof |= ss->inner->eof; return 0; case 1: break; @@ -251,6 +252,7 @@ bs_read_internal(stream *restrict ss, vo ssize_t m = ss->inner->read(ss->inner, buf, 1, n); if (m <= 0) { + ss->eof |= ss->inner->eof; mnstr_copy_error(ss, ss->inner); return -1; } @@ -288,6 +290,7 @@ bs_read_internal(stream *restrict ss, vo mnstr_copy_error(ss, ss->inner); return -1; case 0: + ss->eof |= ss->inner->eof; return 0; case 1: break; diff --git a/common/stream/bs2.c b/common/stream/bs2.c --- a/common/stream/bs2.c +++ b/common/stream/bs2.c @@ -341,6 +341,7 @@ bs2_read(stream *restrict ss, void *rest mnstr_copy_error(ss, s->s); return -1; case 0: + ss->eof |= s->s->eof; return 0; case 1: break; @@ -370,6 +371,7 @@ bs2_read(stream *restrict ss, void *rest ssize_t bytes_read = 0; bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); if (bytes_read <= 0) { + ss->eof |= s->s->eof; mnstr_copy_error(ss, s->s); return -1; } @@ -419,6 +421,7 @@ bs2_read(stream *restrict ss, void *rest mnstr_copy_error(ss, s->s); return -1; case 0: + ss->eof |= s->s->eof; return 0; case 1: break; @@ -450,6 +453,7 @@ bs2_read(stream *restrict ss, void *rest ssize_t bytes_read = 0; bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); if (bytes_read <= 0) { + ss->eof |= s->s->eof; mnstr_copy_error(ss, s->s); return -1; } diff --git a/common/stream/callback.c b/common/stream/callback.c --- a/common/stream/callback.c +++ b/common/stream/callback.c @@ -52,7 +52,9 @@ cb_read(stream *restrict s, void *restri { struct cbstream *cb = s->stream_data.p; - return cb->read(cb->private, buf, elmsize, cnt); + ssize_t ret = cb->read(cb->private, buf, elmsize, cnt); + s->eof |= ret == 0; + return ret; } static ssize_t diff --git a/common/stream/fwf.c b/common/stream/fwf.c --- a/common/stream/fwf.c +++ b/common/stream/fwf.c @@ -55,13 +55,14 @@ stream_fwf_read(stream *restrict s, void if (actually_read < 0) { return actually_read; /* this is an error */ } - fsd->eof = true; + fsd->eof |= fsd->s->eof; return (ssize_t) buf_written; /* skip last line */ } /* consume to next newline */ while (fsd->s->read(fsd->s, &nl_buf, 1, 1) == 1 && nl_buf != '\n') ; + fsd->eof |= fsd->s->eof; for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) { char *val_start, *val_end; diff --git a/common/stream/iconv_stream.c b/common/stream/iconv_stream.c --- a/common/stream/iconv_stream.c +++ b/common/stream/iconv_stream.c @@ -191,7 +191,7 @@ ic_read(stream *restrict s, void *restri if (inbuf > ic->buffer) memmove(ic->buffer, inbuf, inbytesleft); ic->buflen = inbytesleft; - if (outbytesleft == elmsize * cnt) { + if (outbytesleft == elmsize * cnt && !s->inner->eof) { /* if we're returning data, we must pass on EOF on the * next call (i.e. keep ic->eof set), otherwise we * must clear it so that the next call will cause the diff --git a/common/stream/memio.c b/common/stream/memio.c --- a/common/stream/memio.c +++ b/common/stream/memio.c @@ -97,6 +97,7 @@ buffer_read(stream *restrict s, void *re b->pos += size; return (ssize_t) (size / elmsize); } + s->eof |= b->pos == b->len; return 0; } diff --git a/common/stream/pump.c b/common/stream/pump.c --- a/common/stream/pump.c +++ b/common/stream/pump.c @@ -199,12 +199,13 @@ pump_in(stream *s) // Error. Return directly, discarding any data lingering // in the internal state. return PUMP_ERROR; - if (nread == 0) + if (nread == 0) { // Set to NULL so we'll remember next time. // Maybe there is some data in the internal state we don't // return immediately. src = (pump_buffer){.start=NULL, .count=0}; - else + s->eof |= s->inner->eof; + } else // All good src = (pump_buffer) { .start = buffer.start, .count = nread}; diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c --- a/common/stream/socket_stream.c +++ b/common/stream/socket_stream.c @@ -186,8 +186,10 @@ socket_read(stream *restrict s, void *re #endif break; } - if (nr == 0) + if (nr == 0) { + s->eof = true; return 0; /* end of file */ + } if (elmsize > 1) { while ((size_t) nr % elmsize != 0) { /* if elmsize > 1, we really expect that "the diff --git a/common/stream/stdio_stream.c b/common/stream/stdio_stream.c --- a/common/stream/stdio_stream.c +++ b/common/stream/stdio_stream.c @@ -36,6 +36,7 @@ file_read(stream *restrict s, void *rest mnstr_set_error_errno(s, MNSTR_READ_ERROR, "read error"); return -1; } + s->eof |= rc == 0 && feof(fp); } return (ssize_t) rc; } diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -577,6 +577,12 @@ mnstr_isalive(const stream *s) } +bool +mnstr_eof(const stream *s) +{ + return s->eof; +} + char * mnstr_name(const stream *s) { @@ -704,6 +710,7 @@ create_stream(const char *name) .readonly = true, .isutf8 = false, /* not known for sure */ .binary = false, + .eof = false, .name = strdup(name), .errkind = MNSTR_NO__ERROR, .errmsg = {0}, @@ -726,7 +733,9 @@ create_stream(const char *name) static ssize_t wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) { - return s->inner->read(s->inner, buf, elmsize, cnt); + ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt); + s->eof |= s->inner->eof; + return ret; } diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -158,6 +158,7 @@ stream_export bool mnstr_get_swapbytes(c stream_export void mnstr_set_bigendian(stream *s, bool bigendian); // used in mapi.c and mal_session.c stream_export void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data); // used in mapi.c and mal_session.c stream_export int mnstr_isalive(const stream *s); // used once in mal_interpreter.c +stream_export bool mnstr_eof(const stream *s); // stream saw end-of-file stream_export stream *open_rstream(const char *filename); // used in mclient.c, gdk_logger.c, store.c, snapshot.c stream_export stream *open_wstream(const char *filename); // used in gdk_logger.c and store.c diff --git a/common/stream/stream_internal.h b/common/stream/stream_internal.h --- a/common/stream/stream_internal.h +++ b/common/stream/stream_internal.h @@ -151,6 +151,7 @@ struct stream { bool readonly; /* only reading or only writing */ bool isutf8; /* known to be UTF-8 due to BOM */ bool binary; /* text/binary */ + bool eof; /* we've seen end-of-file (i.e. not block end) */ unsigned int timeout; /* timeout in ms */ bool (*timeout_func)(void *); /* callback function: NULL/true -> return */ void *timeout_data; /* data for the above */ diff --git a/common/stream/winio.c b/common/stream/winio.c --- a/common/stream/winio.c +++ b/common/stream/winio.c @@ -92,6 +92,7 @@ console_read(stream *restrict s, void *r c->rd = 0; if (c->len > 0 && c->wbuf[0] == 26) { /* control-Z */ c->len = 0; + s->eof = true; return 0; } if (c->len > 0 && c->wbuf[0] == 0xFEFF) @@ -302,8 +303,10 @@ pipe_read(stream *restrict s, void *rest for (;;) { DWORD ret = PeekNamedPipe(h, NULL, 0, NULL, &nread, NULL); if (ret == 0) { - if (GetLastError() == ERROR_BROKEN_PIPE) + if (GetLastError() == ERROR_BROKEN_PIPE) { + s->eof = true; _______________________________________________ checkin-list mailing list [email protected] https://www.monetdb.org/mailman/listinfo/checkin-list
