Changeset: f15d5695796f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f15d5695796f
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/mal/Tests/performanceTests/run
        monetdb5/mal/mal.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_client.h
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
        monetdb5/mal/mal_runtime.c
        monetdb5/mal/mal_runtime.h
        monetdb5/modules/mal/sysmon.c
        sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6219.stable.out
        sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6432.stable.err
        sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6432.stable.out
        sql/test/BugTracker-2017/Tests/sqlsmith04.stable.out
        sql/test/sys-schema/Tests/check_ForeignKey_referential_integrity.sql
        sql/test/sys-schema/Tests/check_PrimaryKey_uniqueness.sql
        sql/test/sys-schema/Tests/check_PrimaryKey_uniqueness.stable.out
Branch: sessions
Log Message:

Move the workers/memory status to clients. Some cleanup and prepare for more 
decisions.


diffs (truncated from 556 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1347,7 +1347,7 @@ str LIKEjoin(bat *r1, bat *r2, const bat
 str LIKEjoin1(bat *r1, bat *r2, const bat *lid, const bat *rid, const bat 
*slid, const bat *srid, const bit *nil_matches, const lng *estimate);
 str MACROprocessor(Client cntxt, MalBlkPtr mb, Symbol t);
 int MAL_MAXCLIENTS;
-int MALadmission(lng argclaim, lng hotclaim);
+int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
 str MALassertBit(void *ret, bit *val, str *msg);
 str MALassertHge(void *ret, hge *val, str *msg);
 str MALassertInt(void *ret, int *val, str *msg);
@@ -2356,8 +2356,6 @@ int mayhaveSideEffects(Client cntxt, Mal
 void mdbDump(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str mdbRef;
 void mdbSetBreakRequest(Client cntxt, MalBlkPtr mb, str request, char cmd);
-int memoryclaims;
-lng memorypool;
 str mergecandRef;
 str mergepackRef;
 str minRef;
diff --git a/monetdb5/mal/Tests/performanceTests/run 
b/monetdb5/mal/Tests/performanceTests/run
--- a/monetdb5/mal/Tests/performanceTests/run
+++ b/monetdb5/mal/Tests/performanceTests/run
@@ -1,5 +1,7 @@
 #!/bin/bash
 
+# NEEDS UPDATES
+
 iter=5
 tsts="base tst400a tst400bHuge tst400cHuge tst400d tst400e tst901a tst901b"
 
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -89,8 +89,6 @@ mal_export lng MALdebug;
 mal_export char     monet_cwd[FILENAME_MAX];
 mal_export size_t      monet_memory;
 mal_export char        monet_characteristics[4096];
-mal_export lng                 memorypool;      /* memory claimed by 
concurrent threads */
-mal_export int                 memoryclaims;    /* number of threads active 
with expensive operations */
 mal_export stream      *maleventstream;
 
 #ifdef HAVE_HGE
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -249,6 +249,7 @@ MCinitClientRecord(Client c, oid user, b
        c->memorylimit = 0;
        c->querytimeout = 0;
        c->sessiontimeout = 0;
+       c->workers = c->memory = 0;
        c->itrace = 0;
        c->errbuf = 0;
 
@@ -372,6 +373,8 @@ MCforkClient(Client father)
                son->memorylimit = father->memorylimit;
                son->querytimeout = father->querytimeout;
                son->sessiontimeout = father->sessiontimeout;
+               son->workers = father->workers;
+               son->memory = father->memory;
 
                if (son->prompt)
                        GDKfree(son->prompt);
@@ -441,6 +444,7 @@ MCfreeClient(Client c)
        c->memorylimit = 0;
        c->querytimeout = 0;
        c->sessiontimeout = 0;
+       c->workers = c->memory = 0;
        c->user = oid_nil;
        if( c->username){
                GDKfree(c->username);
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -78,7 +78,9 @@ typedef struct CLIENT {
        /*
         * For program debugging and performance trace we keep the actual 
resource claims.
         */
-       time_t  lastcmd;                /* set when query is received */
+       time_t  lastcmd;        /* set when query is received */
+       int             workers;        /* Actual number of concurrent workers 
*/
+       int             memory;         /* Actual memory claim highwater mark */
 
        /* The user can request a TRACE SQL statement, calling for collecting 
the events locally */
        BAT *profticks;                         
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -187,7 +187,6 @@ q_enqueue(Queue *q, FlowEvent d)
  * that want to use a big recent result
  */
 
-#ifdef USE_MAL_ADMISSION
 static void
 q_requeue_(Queue *q, FlowEvent d)
 {
@@ -216,7 +215,6 @@ q_requeue(Queue *q, FlowEvent d)
        MT_lock_unset(&q->l);
        MT_sema_up(&q->s);
 }
-#endif
 
 static FlowEvent
 q_dequeue(Queue *q, Client cntxt)
@@ -382,8 +380,7 @@ DFLOWworker(void *T)
                        continue;
                }
 
-#ifdef USE_MAL_ADMISSION
-               if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, 
fe->hotclaim)) {
+               if (MALrunningThreads() > 2 && MALadmission(cntxt, 
fe->argclaim, fe->hotclaim)) {
                        // never block on deblockdataflow()
                        p= getInstrPtr(flow->mb,fe->pc);
                        if( p->fcn != (MALfcn) deblockdataflow){
@@ -395,14 +392,11 @@ DFLOWworker(void *T)
                                continue;
                        }
                }
-#endif
                error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
                PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " 
LLFMT "," LLFMT "," LLFMT " %s\n",
                                                 fe->pc, id, fe->argclaim, 
fe->hotclaim, fe->maxclaim, error ? error : "");
-#ifdef USE_MAL_ADMISSION
                /* release the memory claim */
-               MALadmission(-fe->argclaim, -fe->hotclaim);
-#endif
+               MALadmission(cntxt, -fe->argclaim, -fe->hotclaim);
                /* update the numa information. keep the thread-id producing 
the value */
                p= getInstrPtr(flow->mb,fe->pc);
                for( i = 0; i < p->argc; i++)
@@ -428,7 +422,6 @@ DFLOWworker(void *T)
                 * because we hold the logical lock.
                 * All eligible instructions are queued
                 */
-#ifdef USE_MAL_ADMISSION
        {
                InstrPtr p = getInstrPtr(flow->mb, fe->pc);
                assert(p);
@@ -442,7 +435,6 @@ DFLOWworker(void *T)
                        if( footprint > fe->maxclaim) fe->maxclaim = footprint;
                }
        }
-#endif
                MT_lock_set(&flow->flowlock);
 
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
@@ -676,9 +668,6 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                        fprintf(stderr, "\n");
                }
        }
-#ifdef USE_MAL_ADMISSION
-       memorypool = memoryclaims = 0;
-#endif
        return MAL_SUCCEED;
 }
 
@@ -711,10 +700,8 @@ DFLOWscheduler(DataFlow flow, struct wor
 {
        int last;
        int i;
-#ifdef USE_MAL_ADMISSION
        int j;
        InstrPtr p;
-#endif
        int tasks=0, actions;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
@@ -730,7 +717,6 @@ DFLOWscheduler(DataFlow flow, struct wor
        MT_lock_set(&flow->flowlock);
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
-#ifdef USE_MAL_ADMISSION
                        p = getInstrPtr(flow->mb,fe[i].pc);
                        if (p == NULL) {
                                MT_lock_unset(&flow->flowlock);
@@ -738,7 +724,6 @@ DFLOWscheduler(DataFlow flow, struct wor
                        }
                        for (j = p->retc; j < p->argc; j++)
                                fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, p, j, FALSE);
-#endif
                        q_enqueue(todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT 
"\n", flow->status[i].pc, flow->status[i].argclaim);
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -13,8 +13,8 @@
 #include "mal_private.h"
 
 /* MEMORY admission does not seem to have a major impact */
-lng memorypool = 0;      /* memory claimed by concurrent threads */
-int memoryclaims = 0;    /* number of threads active with expensive operations 
*/
+static lng memorypool = 0;      /* memory claimed by concurrent threads */
+static int memoryclaims = 0;
 
 void
 mal_resource_reset(void)
@@ -88,53 +88,45 @@ getMemoryClaim(MalBlkPtr mb, MalStkPtr s
 }
 
 /*
- * A consequence of multiple threads is that they may claim more
- * space than available. This may cause GDKmalloc to fail.
- * In many cases this situation will be temporary, because
- * threads will ultimately release resources.
- * Therefore, we wait for it.
- *
- * Alternatively, a front-end can set the flow administration
- * program counter to -1, which leads to a soft abort.
- * [UNFORTUNATELY this approach does not (yet) work
- * because there seem to a possibility of a deadlock
- * between incref and bbptrim. Furthermore, we have
- * to be assured that the partial executed instruction
- * does not lead to ref-count errors.]
+ * The hotclaim indicates the amount of data recentely written as a result of 
an operation. 
+ * The argclaim provides a hint on how much we actually may need to execute
+ * The argclaim is the sum over the hotclaims for all arguments.
+ * The hotclaim is a hint on how large the result could be.
  *
- * The worker produces a result which will potentially unblock
- * instructions. This it can find itself without the help of the scheduler
- * and without the need for a lock. (does it?, parallel workers?)
- * It could also give preference to an instruction that eats away the object
- * just produced. THis way it need not be saved on disk for a long time.
+ * The client context also keeps bounds on the memory claim/client.
+ * Surpassing this bound may be a reason to not admit the instruction to 
proceed.
  */
-/*
- * The hotclaim indicates the amount of data recentely written.
- * as a result of an operation. The argclaim is the sum over the hotclaims
- * for all arguments.
- * The argclaim provides a hint on how much we actually may need to execute
- * The hotclaim is a hint how large the result would be.
- */
-#ifdef USE_MAL_ADMISSION
 static MT_Lock admissionLock = MT_LOCK_INITIALIZER("admissionLock");
+ATOMIC_TYPE mal_running = ATOMIC_VAR_INIT(0);
 
 /* experiments on sf-100 on small machine showed no real improvement */
+
 int
-MALadmission(lng argclaim, lng hotclaim)
+MALadmission(Client cntxt, lng argclaim, lng hotclaim)
 {
+       (void) cntxt;
        /* optimistically set memory */
        if (argclaim == 0)
                return 0;
 
        MT_lock_set(&admissionLock);
-       if (memoryclaims < 0)
+       /* Check if we are allowed a spawn another thread for this client */
+       /* It is somewhat tricky, because we may be in a recursion, each of 
which is counted for.
+        * The MAL interpreter should trim the worker thread count as soon as 
we call a FUNCTION/PATTERN recursively
+        */
+       if( cntxt->workerlimit && cntxt->workerlimit <= cntxt->workers){
+                       PARDEBUG
+                       fprintf(stderr, "#DFLOWadmit check workers %d <= %d\n", 
cntxt->workerlimit, cntxt->workers);
+       }
+       /* Determine if the total memory resource is exhausted */
+       if ( memoryclaims < 0)
                memoryclaims = 0;
-       if (memorypool <= 0 && memoryclaims == 0)
+       if ( memorypool <= 0 && memoryclaims == 0)
                memorypool = (lng)(MEMORY_THRESHOLD );
 
        if (argclaim > 0) {
-               if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
-                       memorypool -= (argclaim + hotclaim);
+               if ( memoryclaims == 0 || memorypool > argclaim + hotclaim) {
+                       memorypool -= (lng) (argclaim + hotclaim);
                        memoryclaims++;
                        PARDEBUG
                        fprintf(stderr, "#DFLOWadmit %3d thread %d pool " LLFMT 
"claims " LLFMT "," LLFMT "\n",
@@ -143,12 +135,13 @@ MALadmission(lng argclaim, lng hotclaim)
                        return 0;
                }
                PARDEBUG
-               fprintf(stderr, "#Delayed due to lack of memory " LLFMT " 
requested " LLFMT " memoryclaims %d\n", memorypool, argclaim + hotclaim, 
memoryclaims);
+               fprintf(stderr, "#Delayed due to lack of memory " LLFMT " 
requested " LLFMT " memoryclaims %d\n", 
+                       memorypool, argclaim + hotclaim, memoryclaims);
                MT_lock_unset(&admissionLock);
                return -1;
        }
        /* release memory claimed before */
-       memorypool += -argclaim - hotclaim;
+       memorypool += (lng) (-argclaim - hotclaim);
        memoryclaims--;
        PARDEBUG
        fprintf(stderr, "#DFLOWadmit %3d thread %d pool " LLFMT " claims " 
LLFMT "," LLFMT "\n",
@@ -156,19 +149,17 @@ MALadmission(lng argclaim, lng hotclaim)
        MT_lock_unset(&admissionLock);
        return 0;
 }
-#endif
 
-/* Delay a thread if too much competition arises and memory becomes a scarce 
resource.
- * If in the mean time memory becomes free, or too many sleeping re-enable 
worker.
- * It may happen that all threads enter the wait state. So, keep one running 
at all time 
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to