Changeset: 24303c28c7fc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=24303c28c7fc
Modified Files:
        common/stream/stream.c
        common/stream/stream.h
        monetdb5/mal/mal_interpreter.c
        monetdb5/optimizer/opt_evaluate.c
        monetdb5/optimizer/opt_support.c
Branch: default
Log Message:

Stop executing a query when the connection to the client is dropped.
In this way, when a user stops the client because the query takes too
long, the server stops.
We only check once per second.


diffs (225 lines):

diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -173,6 +173,7 @@ struct stream {
        int (*fgetpos) (stream *s, lng *p);
        int (*fsetpos) (stream *s, lng p);
        void (*update_timeout) (stream *s);
+       int (*isalive) (stream *s);
 };
 
 int
@@ -516,6 +517,17 @@ mnstr_fsetpos(stream *s, lng p)
        return 0;
 }
 
+int
+mnstr_isalive(stream *s)
+{
+       if (s == NULL)
+               return 0;
+       if (s->errnr)
+               return -1;
+       if (s->isalive)
+               return (*s->isalive)(s);
+       return 1;
+}
 
 char *
 mnstr_name(stream *s)
@@ -687,6 +699,7 @@ create_stream(const char *name)
        s->timeout = 0;
        s->timeout_func = NULL;
        s->update_timeout = NULL;
+       s->isalive = NULL;
 #ifdef STREAM_DEBUG
        fprintf(stderr, "create_stream %s -> " PTRFMT "\n", name ? name : 
"<unnamed>", PTRFMTCAST s);
 #endif
@@ -1965,6 +1978,22 @@ socket_update_timeout(stream *s)
                (void) setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, 
(socklen_t) sizeof(tv));
 }
 
+static int
+socket_isalive(stream *s)
+{
+       SOCKET fd = s->stream_data.s;
+       char buffer[32];
+       fd_set fds;
+       struct timeval t;
+
+       t.tv_sec = 0;
+       t.tv_usec = 0;
+       FD_ZERO(&fds);
+       FD_SET(fd, &fds);
+       return select(fd + 1, &fds, NULL, NULL, &t) <= 0 ||
+               recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0;
+}
+
 static stream *
 socket_open(SOCKET sock, const char *name)
 {
@@ -1980,6 +2009,7 @@ socket_open(SOCKET sock, const char *nam
        s->close = socket_close;
        s->stream_data.s = sock;
        s->update_timeout = socket_update_timeout;
+       s->isalive = socket_isalive;
 
        errno = 0;
 #ifdef _MSC_VER
@@ -2897,6 +2927,19 @@ ic_update_timeout(stream *s)
        }
 }
 
+static int
+ic_isalive(stream *s)
+{
+       struct icstream *ic = (struct icstream *) s->stream_data.p;
+
+       if (ic && ic->s) {
+               if (ic->s->isalive)
+                       return (*ic->s->isalive)(ic->s);
+               return 1;
+       }
+       return 0;
+}
+
 static void
 ic_clrerr(stream *s)
 {
@@ -2920,6 +2963,7 @@ ic_open(iconv_t cd, stream *ss, const ch
        s->clrerr = ic_clrerr;
        s->flush = ic_flush;
        s->update_timeout = ic_update_timeout;
+       s->isalive = ic_isalive;
        s->stream_data.p = malloc(sizeof(struct icstream));
        if (s->stream_data.p == NULL) {
                mnstr_destroy(s);
@@ -3488,6 +3532,19 @@ bs_update_timeout(stream *ss)
        }
 }
 
+static int
+bs_isalive(stream *ss)
+{
+       struct bs *s;
+
+       if ((s = ss->stream_data.p) != NULL && s->s) {
+               if (s->s->isalive)
+                       return (*s->s->isalive)(s->s);
+               return 1;
+       }
+       return 0;
+}
+
 static void
 bs_close(stream *ss)
 {
@@ -3555,6 +3612,7 @@ block_stream(stream *s)
        ns->read = bs_read;
        ns->write = bs_write;
        ns->update_timeout = bs_update_timeout;
+       ns->isalive = bs_isalive;
        ns->stream_data.p = (void *) b;
 
        return ns;
@@ -4129,6 +4187,19 @@ wbs_update_timeout(stream *s)
        }
 }
 
+static int
+wbs_isalive(stream *s)
+{
+       wbs_stream *wbs = (wbs_stream *) s->stream_data.p;
+
+       if (wbs && wbs->s) {
+               if (wbs->s->isalive)
+                       return (*wbs->s->isalive)(wbs->s);
+               return 1;
+       }
+       return 0;
+}
+
 static void
 wbs_clrerr(stream *s)
 {
@@ -4159,6 +4230,7 @@ wbstream(stream *s, size_t buflen)
        ns->destroy = wbs_destroy;
        ns->flush = wbs_flush;
        ns->update_timeout = wbs_update_timeout;
+       ns->isalive = wbs_isalive;
        ns->write = wbs_write;
        ns->stream_data.p = (void *) wbs;
        wbs->s = s;
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -142,6 +142,7 @@ stream_export void mnstr_set_byteorder(s
 stream_export stream *mnstr_rstream(stream *s);
 stream_export stream *mnstr_wstream(stream *s);
 stream_export void mnstr_settimeout(stream *s, unsigned int ms, int 
(*func)(void));
+stream_export int mnstr_isalive(stream *s);
 
 stream_export stream *open_rstream(const char *filename);
 stream_export stream *open_wstream(const char *filename);
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -481,6 +481,8 @@ str runMALsequence(Client cntxt, MalBlkP
        int garbages[16], *garbage;
        int stkpc = 0;
        RuntimeProfileRecord runtimeProfile, runtimeProfileFunction;
+       lng lastcheck = 0;
+#define CHECKINTERVAL 1000 /* how often do we check for client disconnect */
        runtimeProfile.ticks = runtimeProfileFunction.ticks = 0;
 
        if (stk == NULL)
@@ -525,7 +527,7 @@ str runMALsequence(Client cntxt, MalBlkP
                pci = getInstrPtr(mb, stkpc);
                if (cntxt->mode == FINISHCLIENT){
                        stkpc = stoppc;
-                       ret= createException(MAL, "mal.interpreter", "premature 
stopped client");
+                       ret= createException(MAL, "mal.interpreter", 
"prematurely stopped client");
                        break;
                }
                if (cntxt->itrace || mb->trap || stk->status) {
@@ -550,6 +552,16 @@ str runMALsequence(Client cntxt, MalBlkP
 
                //Ensure we spread system resources over multiple users as well.
                runtimeProfileBegin(cntxt, mb, stk, pci, &runtimeProfile);
+               if (runtimeProfile.ticks > lastcheck + CHECKINTERVAL) {
+                       if (!mnstr_isalive(cntxt->fdin->s)) {
+                               cntxt->mode = FINISHCLIENT;
+                               stkpc = stoppc;
+                               ret= createException(MAL, "mal.interpreter", 
"prematurely stopped client");
+                               break;
+                       }
+                       lastcheck = runtimeProfile.ticks;
+               }
+
         if (!RECYCLEentry(cntxt, mb, stk, pci,&runtimeProfile)){
                        /* The interpreter loop
                         * The interpreter is geared towards execution a MAL
diff --git a/monetdb5/optimizer/opt_evaluate.c 
b/monetdb5/optimizer/opt_evaluate.c
--- a/monetdb5/optimizer/opt_evaluate.c
+++ b/monetdb5/optimizer/opt_evaluate.c
@@ -159,7 +159,7 @@ OPTevaluateImplementation(Client cntxt, 
                        assigned[getArg(p,k)]++;
        }
 
-       for (i = 1; i < limit; i++) {
+       for (i = 1; i < limit && cntxt->mode != FINISHCLIENT; i++) {
                p = getInstrPtr(mb, i);
                // to avoid management of duplicate assignments over multiple 
blocks
                // we limit ourselfs to evaluation of the first assignment only.
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -210,6 +210,8 @@ OPTsetDebugStr(void *ret, str *nme)
 str
 optimizerCheck(Client cntxt, MalBlkPtr mb, str name, int actions, lng usec)
 {
+       if (cntxt->mode == FINISHCLIENT)
+               throw(MAL, name, "prematurely stopped client");
        if( actions > 0){
                chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE);
                chkFlow(cntxt->fdout, mb);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to