Changeset: 9a93a50925ef for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9a93a50925ef
Modified Files:
common/stream/mapi_stream.c
common/stream/socket_stream.c
monetdb5/mal/mal_client.c
monetdb5/modules/mal/tablet.c
sql/backends/monet5/sql.c
sql/server/sql_scan.c
Branch: Aug2024
Log Message:
Fix a problem that interrupting a query halfway on the client didn't work
properly.
diffs (185 lines):
diff --git a/common/stream/mapi_stream.c b/common/stream/mapi_stream.c
--- a/common/stream/mapi_stream.c
+++ b/common/stream/mapi_stream.c
@@ -152,8 +152,12 @@ setup_transfer(const char *req, const ch
bool ok;
int oob = 0;
- while (!bs->eof)
- bstream_next(bs);
+ while (!bs->eof) {
+ if (bstream_next(bs) < 0) {
+ msg = mnstr_peek_error(ws);
+ goto end;
+ }
+ }
stream *rs = bs->s;
assert(isa_block_stream(ws));
assert(isa_block_stream(rs));
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -126,7 +126,7 @@ socket_read(stream *restrict s, void *re
struct pollfd pfd;
pfd = (struct pollfd) {.fd = s->stream_data.s,
- .events = POLLIN};
+ .events = POLLIN | POLLPRI};
ret = poll(&pfd, 1, (int) s->timeout);
if (ret == -1 && errno == EINTR)
@@ -135,6 +135,20 @@ socket_read(stream *restrict s, void *re
mnstr_set_error_errno(s, MNSTR_READ_ERROR,
"poll error");
return -1;
}
+ if (ret == 1 && pfd.revents & POLLPRI) {
+ char b = 0;
+ switch (recv(s->stream_data.s, &b, 1, MSG_OOB))
{
+ case 0:
+ /* unexpectedly didn't receive a byte */
+ continue;
+ case 1:
+ mnstr_set_error(s, MNSTR_INTERRUPT,
"query abort from client");
+ return -1;
+ case -1:
+ mnstr_set_error_errno(s,
MNSTR_READ_ERROR, "recv error");
+ return -1;
+ }
+ }
#else
struct timeval tv;
fd_set fds;
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -107,11 +107,13 @@ MCpushClientInput(Client c, bstream *new
ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput));
if (x == 0)
return -1;
- x->fdin = c->fdin;
- x->yycur = c->yycur;
- x->listing = c->listing;
- x->prompt = c->prompt;
- x->next = c->bak;
+ *x = (ClientInput) {
+ .fdin = c->fdin,
+ .yycur = c->yycur,
+ .listing = c->listing,
+ .prompt = c->prompt,
+ .next = c->bak,
+ };
c->bak = x;
c->fdin = new_input;
c->qryctx.bs = new_input;
@@ -567,7 +569,10 @@ MCreadClient(Client c)
if (!in->mode) /* read one line at a time in
line mode */
break;
}
- if (in->mode) { /* find last new line */
+ if (rd < 0) {
+ /* force end of stream handling below */
+ in->pos = in->len;
+ } else if (in->mode) { /* find last new line */
char *p = in->buf + in->len - 1;
while (p > in->buf && *p != '\n') {
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -601,8 +601,13 @@ tablet_read_more(READERtask *task)
do {
/* query is not finished ask for more */
/* we need more query text */
- if (bstream_next(in) < 0)
+ if (bstream_next(in) < 0) {
+ if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
+ task->aborted = true;
+ mnstr_clearerr(in->s);
+ }
return false;
+ }
if (in->eof) {
if (bstream_getoob(in)) {
task->aborted = true;
@@ -612,7 +617,14 @@ tablet_read_more(READERtask *task)
mnstr_flush(out, MNSTR_FLUSH_DATA);
in->eof = false;
/* we need more query text */
- if (bstream_next(in) <= 0)
+ if (bstream_next(in) < 0) {
+ if (mnstr_errnr(in->s) ==
MNSTR_INTERRUPT) {
+ task->aborted = true;
+ mnstr_clearerr(in->s);
+ }
+ return false;
+ }
+ if (in->eof)
return false;
}
} while (in->len <= in->pos);
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -2633,8 +2633,12 @@ mvc_export_table_wrap( Client cntxt, Mal
}
be->output_format = OFMT_CSV;
} else {
- while (!m->scanner.rs->eof)
- bstream_next(m->scanner.rs);
+ while (!m->scanner.rs->eof) {
+ if (bstream_next(m->scanner.rs) < 0) {
+ msg = createException(IO, "streams.open",
"interrupted");
+ goto wrapup_result_set1;
+ }
+ }
s = m->scanner.ws;
mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1);
mnstr_printf(s, "w %s\n", filename);
@@ -2868,8 +2872,12 @@ mvc_export_row_wrap( Client cntxt, MalBl
goto wrapup_result_set;
}
} else {
- while (!m->scanner.rs->eof)
- bstream_next(m->scanner.rs);
+ while (!m->scanner.rs->eof) {
+ if (bstream_next(m->scanner.rs) < 0) {
+ msg = createException(IO, "streams.open",
"interrupted");
+ goto wrapup_result_set;
+ }
+ }
s = m->scanner.ws;
mnstr_write(s, PROMPT3, sizeof(PROMPT3) - 1, 1);
mnstr_printf(s, "w %s\n", filename);
@@ -4310,7 +4318,8 @@ SQLhot_snapshot(Client cntxt, MalBlkPtr
// sync with client, copy pasted from mvc_export_table_wrap
while (!mvc->scanner.rs->eof)
- bstream_next(mvc->scanner.rs);
+ if (bstream_next(mvc->scanner.rs) < 0)
+ throw(SQL, "sql.hot_snapshot", "interrupted");
// The snapshot code flushes from time to time.
// Use a callback stream to suppress those.
diff --git a/sql/server/sql_scan.c b/sql/server/sql_scan.c
--- a/sql/server/sql_scan.c
+++ b/sql/server/sql_scan.c
@@ -714,9 +714,16 @@ scanner_read_more(struct scanner *lc, si
more = true;
}
/* we need more query text */
- if (bstream_next(b) < 0 ||
- /* we asked for more data but didn't get any */
- (more && b->eof && b->len < b->pos + lc->yycur + n))
+ if (bstream_next(b) < 0) {
+ if (mnstr_errnr(b->s) == MNSTR_INTERRUPT) {
+ // now what?
+ lc->errstr = "Query aborted";
+ lc->aborted = true;
+ mnstr_clearerr(b->s);
+ }
+ return EOF;
+ } else if (/* we asked for more data but didn't get any */
+ (more && b->eof && b->len < b->pos + lc->yycur + n))
return EOF;
if (more && b->pos + lc->yycur + 2 == b->len && b->buf[b->pos +
lc->yycur] == '\200' && b->buf[b->pos + lc->yycur + 1] == '\n') {
lc->errstr = "Query aborted";
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]