Changeset: 30d7cced0caf for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30d7cced0caf
Modified Files:
clients/Tests/exports.stable.out
monetdb5/mal/mal.h
monetdb5/mal/mal_client.c
monetdb5/mal/mal_client.h
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_interpreter.c
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
monetdb5/mal/mal_runtime.c
monetdb5/modules/mal/sysmon.c
Branch: sessions
Log Message:
The workers and memory consumption should be associated with a dataflow stack.
This limits parallelism within a single dataflow block.
The alternative, keeping it at the Client level makes it much more difficult
to decide if a particular thread may continue. In particular when we are
confronted with a recursive MAL block call.
diffs (199 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1347,7 +1347,7 @@ str LIKEjoin(bat *r1, bat *r2, const bat
str LIKEjoin1(bat *r1, bat *r2, const bat *lid, const bat *rid, const bat
*slid, const bat *srid, const bit *nil_matches, const lng *estimate);
str MACROprocessor(Client cntxt, MalBlkPtr mb, Symbol t);
int MAL_MAXCLIENTS;
-int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
+int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim);
str MALassertBit(void *ret, bit *val, str *msg);
str MALassertHge(void *ret, hge *val, str *msg);
str MALassertInt(void *ret, int *val, str *msg);
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -271,6 +271,9 @@ typedef struct MALSTK {
char status; /* srunning 'R' suspended 'S', quiting 'Q' */
int pcup; /* saved pc upon a recursive all */
oid tag; /* unique invocation call tag */
+ ATOMIC_TYPE workers; /* Actual number of concurrent
workers */
+ ATOMIC_TYPE memory; /* Actual memory claim
highwater mark */
+
struct MALSTK *up; /* stack trace list */
struct MALBLK *blk; /* associated definition */
ValRecord stk[FLEXIBLE_ARRAY_MEMBER];
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -249,7 +249,6 @@ MCinitClientRecord(Client c, oid user, b
c->memorylimit = 0;
c->querytimeout = 0;
c->sessiontimeout = 0;
- c->workers = c->memory = 0;
c->itrace = 0;
c->errbuf = 0;
@@ -373,8 +372,6 @@ MCforkClient(Client father)
son->memorylimit = father->memorylimit;
son->querytimeout = father->querytimeout;
son->sessiontimeout = father->sessiontimeout;
- son->workers = father->workers;
- son->memory = father->memory;
if (son->prompt)
GDKfree(son->prompt);
@@ -444,7 +441,6 @@ MCfreeClient(Client c)
c->memorylimit = 0;
c->querytimeout = 0;
c->sessiontimeout = 0;
- c->workers = c->memory = 0;
c->user = oid_nil;
if( c->username){
GDKfree(c->username);
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -79,8 +79,6 @@ typedef struct CLIENT {
* For program debugging and performance trace we keep the actual
resource claims.
*/
time_t lastcmd; /* set when query is received */
- int workers; /* Actual number of concurrent workers
*/
- int memory; /* Actual memory claim highwater mark */
/* The user can request a TRACE SQL statement, calling for collecting
the events locally */
BAT *profticks;
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,7 +380,7 @@ DFLOWworker(void *T)
continue;
}
- if (MALrunningThreads() > 2 && MALadmission(cntxt,
fe->argclaim, fe->hotclaim)) {
+ if (MALrunningThreads() > 2 && MALadmission(flow->cntxt,
flow->stk, fe->argclaim, fe->hotclaim)) {
// never block on deblockdataflow()
p= getInstrPtr(flow->mb,fe->pc);
if( p->fcn != (MALfcn) deblockdataflow){
@@ -392,11 +392,13 @@ DFLOWworker(void *T)
continue;
}
}
+ (void) ATOMIC_INC(&flow->stk->workers);
error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
+ (void) ATOMIC_DEC(&flow->stk->workers);
PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= "
LLFMT "," LLFMT "," LLFMT " %s\n",
fe->pc, id, fe->argclaim,
fe->hotclaim, fe->maxclaim, error ? error : "");
/* release the memory claim */
- MALadmission(cntxt, -fe->argclaim, -fe->hotclaim);
+ MALadmission(flow->cntxt, flow->stk, -fe->argclaim,
-fe->hotclaim);
/* update the numa information. keep the thread-id producing
the value */
p= getInstrPtr(flow->mb,fe->pc);
for( i = 0; i < p->argc; i++)
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -264,6 +264,8 @@ prepareMALstack(MalBlkPtr mb, int size)
//stk->stksize = size;
stk->stktop = mb->vtop;
stk->blk = mb;
+ stk->workers = ATOMIC_VAR_INIT(0);
+ stk->memory = ATOMIC_VAR_INIT(0);;
initStack(0, res);
if(!res) {
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -102,21 +102,28 @@ ATOMIC_TYPE mal_running = ATOMIC_VAR_INI
/* experiments on sf-100 on small machine showed no real improvement */
int
-MALadmission(Client cntxt, lng argclaim, lng hotclaim)
+MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng hotclaim)
{
- (void) cntxt;
+ int workers;
+
+ (void) stk;
/* optimistically set memory */
if (argclaim == 0)
return 0;
MT_lock_set(&admissionLock);
/* Check if we are allowed a spawn another thread for this client */
- /* It is somewhat tricky, because we may be in a recursion, each of
which is counted for.
- * The MAL interpreter should trim the worker thread count as soon as
we call a FUNCTION/PATTERN recursively
+ /* It is somewhat tricky, because we may be in a dataflow recursion,
each of which is counted for.
+ * A way out is to attach the thread count to the MAL stacks instead.
*/
- if( cntxt->workerlimit && cntxt->workerlimit <= cntxt->workers){
+ workers = (int) ATOMIC_GET(&stk->workers);
+ if( cntxt->workerlimit){
+ if(cntxt->workerlimit <= workers){
PARDEBUG
- fprintf(stderr, "#DFLOWadmit check workers %d <= %d\n",
cntxt->workerlimit, cntxt->workers);
+ fprintf(stderr, "#DFLOWadmit check workers, not allowed
%d <= %d\n", cntxt->workerlimit, workers);
+ MT_lock_unset(&admissionLock);
+ return 0;
+ }
}
/* Determine if the total memory resource is exhausted */
if ( memoryclaims < 0)
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -21,7 +21,7 @@
#define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0)
#define hashinfo(X,Id) ((X) && (X) != (Hash *) 1 ? heapinfo(&(X)->heap, Id) :
0)
-mal_export int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
+mal_export int MALadmission(Client cntxt, MalStkPtr stk, lng argclaim, lng
hotclaim);
#define FAIRNESS_THRESHOLD (MAX_DELAYS * DELAYUNIT)
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -97,7 +97,6 @@ runtimeProfileInit(Client cntxt, MalBlkP
QRYqueue[i].query = q? GDKstrdup(q):0;
QRYqueue[i].status = "running";
QRYqueue[i].cntxt = cntxt;
- cntxt->workers++;
stk->tag = mb->tag = QRYqueue[i].tag;
}
qtop += i == qtop;
@@ -141,7 +140,6 @@ runtimeProfileFinish(Client cntxt, MalBl
qtop = j;
QRYqueue[qtop].query = NULL; /* sentinel for SYSMONqueue() */
- cntxt->workers--;
MT_lock_unset(&mal_delayLock);
}
diff --git a/monetdb5/modules/mal/sysmon.c b/monetdb5/modules/mal/sysmon.c
--- a/monetdb5/modules/mal/sysmon.c
+++ b/monetdb5/modules/mal/sysmon.c
@@ -30,6 +30,7 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb,
bat *w = getArgReference_bat(stk,pci,7);
bat *m = getArgReference_bat(stk,pci,8);
lng i, qtag;
+ int wrk, mem;
str usr;
timestamp tsn;
str msg = MAL_SUCCEED;
@@ -90,9 +91,12 @@ SYSMONqueue(Client cntxt, MalBlkPtr mb,
}
if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
goto bailout;
+
+ wrk = (int) ATOMIC_GET(&QRYqueue[i].stk->workers);
+ mem = (int) ATOMIC_GET(&QRYqueue[i].stk->memory);
if (BUNappend(progress, &QRYqueue[i].progress, false) !=
GDK_SUCCEED ||
- BUNappend(workers, &QRYqueue[i].cntxt->workers, false) !=
GDK_SUCCEED ||
- BUNappend(memory, &QRYqueue[i].cntxt->memory, false) !=
GDK_SUCCEED)
+ BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
+ BUNappend(memory, &mem, false) != GDK_SUCCEED)
goto bailout;
}
MT_lock_unset(&mal_delayLock);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list