Changeset: 24303c28c7fc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=24303c28c7fc
Modified Files:
common/stream/stream.c
common/stream/stream.h
monetdb5/mal/mal_interpreter.c
monetdb5/optimizer/opt_evaluate.c
monetdb5/optimizer/opt_support.c
Branch: default
Log Message:
Stop executing a query when the connection to the client is dropped.
In this way, when a user stops the client because the query takes too
long, the server stops.
We only check once per second.
diffs (225 lines):
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -173,6 +173,7 @@ struct stream {
int (*fgetpos) (stream *s, lng *p);
int (*fsetpos) (stream *s, lng p);
void (*update_timeout) (stream *s);
+ int (*isalive) (stream *s);
};
int
@@ -516,6 +517,17 @@ mnstr_fsetpos(stream *s, lng p)
return 0;
}
+int
+mnstr_isalive(stream *s)
+{
+ if (s == NULL)
+ return 0;
+ if (s->errnr)
+ return -1;
+ if (s->isalive)
+ return (*s->isalive)(s);
+ return 1;
+}
char *
mnstr_name(stream *s)
@@ -687,6 +699,7 @@ create_stream(const char *name)
s->timeout = 0;
s->timeout_func = NULL;
s->update_timeout = NULL;
+ s->isalive = NULL;
#ifdef STREAM_DEBUG
fprintf(stderr, "create_stream %s -> " PTRFMT "\n", name ? name :
"<unnamed>", PTRFMTCAST s);
#endif
@@ -1965,6 +1978,22 @@ socket_update_timeout(stream *s)
(void) setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv,
(socklen_t) sizeof(tv));
}
+static int
+socket_isalive(stream *s)
+{
+ SOCKET fd = s->stream_data.s;
+ char buffer[32];
+ fd_set fds;
+ struct timeval t;
+
+ t.tv_sec = 0;
+ t.tv_usec = 0;
+ FD_ZERO(&fds);
+ FD_SET(fd, &fds);
+ return select(fd + 1, &fds, NULL, NULL, &t) <= 0 ||
+ recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0;
+}
+
static stream *
socket_open(SOCKET sock, const char *name)
{
@@ -1980,6 +2009,7 @@ socket_open(SOCKET sock, const char *nam
s->close = socket_close;
s->stream_data.s = sock;
s->update_timeout = socket_update_timeout;
+ s->isalive = socket_isalive;
errno = 0;
#ifdef _MSC_VER
@@ -2897,6 +2927,19 @@ ic_update_timeout(stream *s)
}
}
+static int
+ic_isalive(stream *s)
+{
+ struct icstream *ic = (struct icstream *) s->stream_data.p;
+
+ if (ic && ic->s) {
+ if (ic->s->isalive)
+ return (*ic->s->isalive)(ic->s);
+ return 1;
+ }
+ return 0;
+}
+
static void
ic_clrerr(stream *s)
{
@@ -2920,6 +2963,7 @@ ic_open(iconv_t cd, stream *ss, const ch
s->clrerr = ic_clrerr;
s->flush = ic_flush;
s->update_timeout = ic_update_timeout;
+ s->isalive = ic_isalive;
s->stream_data.p = malloc(sizeof(struct icstream));
if (s->stream_data.p == NULL) {
mnstr_destroy(s);
@@ -3488,6 +3532,19 @@ bs_update_timeout(stream *ss)
}
}
+static int
+bs_isalive(stream *ss)
+{
+ struct bs *s;
+
+ if ((s = ss->stream_data.p) != NULL && s->s) {
+ if (s->s->isalive)
+ return (*s->s->isalive)(s->s);
+ return 1;
+ }
+ return 0;
+}
+
static void
bs_close(stream *ss)
{
@@ -3555,6 +3612,7 @@ block_stream(stream *s)
ns->read = bs_read;
ns->write = bs_write;
ns->update_timeout = bs_update_timeout;
+ ns->isalive = bs_isalive;
ns->stream_data.p = (void *) b;
return ns;
@@ -4129,6 +4187,19 @@ wbs_update_timeout(stream *s)
}
}
+static int
+wbs_isalive(stream *s)
+{
+ wbs_stream *wbs = (wbs_stream *) s->stream_data.p;
+
+ if (wbs && wbs->s) {
+ if (wbs->s->isalive)
+ return (*wbs->s->isalive)(wbs->s);
+ return 1;
+ }
+ return 0;
+}
+
static void
wbs_clrerr(stream *s)
{
@@ -4159,6 +4230,7 @@ wbstream(stream *s, size_t buflen)
ns->destroy = wbs_destroy;
ns->flush = wbs_flush;
ns->update_timeout = wbs_update_timeout;
+ ns->isalive = wbs_isalive;
ns->write = wbs_write;
ns->stream_data.p = (void *) wbs;
wbs->s = s;
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -142,6 +142,7 @@ stream_export void mnstr_set_byteorder(s
stream_export stream *mnstr_rstream(stream *s);
stream_export stream *mnstr_wstream(stream *s);
stream_export void mnstr_settimeout(stream *s, unsigned int ms, int
(*func)(void));
+stream_export int mnstr_isalive(stream *s);
stream_export stream *open_rstream(const char *filename);
stream_export stream *open_wstream(const char *filename);
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -481,6 +481,8 @@ str runMALsequence(Client cntxt, MalBlkP
int garbages[16], *garbage;
int stkpc = 0;
RuntimeProfileRecord runtimeProfile, runtimeProfileFunction;
+ lng lastcheck = 0;
+#define CHECKINTERVAL 1000 /* how often do we check for client disconnect */
runtimeProfile.ticks = runtimeProfileFunction.ticks = 0;
if (stk == NULL)
@@ -525,7 +527,7 @@ str runMALsequence(Client cntxt, MalBlkP
pci = getInstrPtr(mb, stkpc);
if (cntxt->mode == FINISHCLIENT){
stkpc = stoppc;
- ret= createException(MAL, "mal.interpreter", "premature
stopped client");
+ ret= createException(MAL, "mal.interpreter",
"prematurely stopped client");
break;
}
if (cntxt->itrace || mb->trap || stk->status) {
@@ -550,6 +552,16 @@ str runMALsequence(Client cntxt, MalBlkP
//Ensure we spread system resources over multiple users as well.
runtimeProfileBegin(cntxt, mb, stk, pci, &runtimeProfile);
+ if (runtimeProfile.ticks > lastcheck + CHECKINTERVAL) {
+ if (!mnstr_isalive(cntxt->fdin->s)) {
+ cntxt->mode = FINISHCLIENT;
+ stkpc = stoppc;
+ ret= createException(MAL, "mal.interpreter",
"prematurely stopped client");
+ break;
+ }
+ lastcheck = runtimeProfile.ticks;
+ }
+
if (!RECYCLEentry(cntxt, mb, stk, pci,&runtimeProfile)){
/* The interpreter loop
* The interpreter is geared towards execution a MAL
diff --git a/monetdb5/optimizer/opt_evaluate.c
b/monetdb5/optimizer/opt_evaluate.c
--- a/monetdb5/optimizer/opt_evaluate.c
+++ b/monetdb5/optimizer/opt_evaluate.c
@@ -159,7 +159,7 @@ OPTevaluateImplementation(Client cntxt,
assigned[getArg(p,k)]++;
}
- for (i = 1; i < limit; i++) {
+ for (i = 1; i < limit && cntxt->mode != FINISHCLIENT; i++) {
p = getInstrPtr(mb, i);
// to avoid management of duplicate assignments over multiple
blocks
// we limit ourselfs to evaluation of the first assignment only.
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -210,6 +210,8 @@ OPTsetDebugStr(void *ret, str *nme)
str
optimizerCheck(Client cntxt, MalBlkPtr mb, str name, int actions, lng usec)
{
+ if (cntxt->mode == FINISHCLIENT)
+ throw(MAL, name, "prematurely stopped client");
if( actions > 0){
chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE);
chkFlow(cntxt->fdout, mb);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list