Changeset: f15d5695796f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f15d5695796f
Modified Files:
clients/Tests/exports.stable.out
monetdb5/mal/Tests/performanceTests/run
monetdb5/mal/mal.h
monetdb5/mal/mal_client.c
monetdb5/mal/mal_client.h
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
monetdb5/mal/mal_runtime.c
monetdb5/mal/mal_runtime.h
monetdb5/modules/mal/sysmon.c
sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6219.stable.out
sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6432.stable.err
sql/test/BugTracker-2017/Tests/sqlsmith.Bug-6432.stable.out
sql/test/BugTracker-2017/Tests/sqlsmith04.stable.out
sql/test/sys-schema/Tests/check_ForeignKey_referential_integrity.sql
sql/test/sys-schema/Tests/check_PrimaryKey_uniqueness.sql
sql/test/sys-schema/Tests/check_PrimaryKey_uniqueness.stable.out
Branch: sessions
Log Message:
Move the workers/memory status to clients. Some cleanup and prepare for more
decisions.
diffs (truncated from 556 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1347,7 +1347,7 @@ str LIKEjoin(bat *r1, bat *r2, const bat
str LIKEjoin1(bat *r1, bat *r2, const bat *lid, const bat *rid, const bat
*slid, const bat *srid, const bit *nil_matches, const lng *estimate);
str MACROprocessor(Client cntxt, MalBlkPtr mb, Symbol t);
int MAL_MAXCLIENTS;
-int MALadmission(lng argclaim, lng hotclaim);
+int MALadmission(Client cntxt, lng argclaim, lng hotclaim);
str MALassertBit(void *ret, bit *val, str *msg);
str MALassertHge(void *ret, hge *val, str *msg);
str MALassertInt(void *ret, int *val, str *msg);
@@ -2356,8 +2356,6 @@ int mayhaveSideEffects(Client cntxt, Mal
void mdbDump(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str mdbRef;
void mdbSetBreakRequest(Client cntxt, MalBlkPtr mb, str request, char cmd);
-int memoryclaims;
-lng memorypool;
str mergecandRef;
str mergepackRef;
str minRef;
diff --git a/monetdb5/mal/Tests/performanceTests/run
b/monetdb5/mal/Tests/performanceTests/run
--- a/monetdb5/mal/Tests/performanceTests/run
+++ b/monetdb5/mal/Tests/performanceTests/run
@@ -1,5 +1,7 @@
#!/bin/bash
+# NEEDS UPDATES
+
iter=5
tsts="base tst400a tst400bHuge tst400cHuge tst400d tst400e tst901a tst901b"
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -89,8 +89,6 @@ mal_export lng MALdebug;
mal_export char monet_cwd[FILENAME_MAX];
mal_export size_t monet_memory;
mal_export char monet_characteristics[4096];
-mal_export lng memorypool; /* memory claimed by
concurrent threads */
-mal_export int memoryclaims; /* number of threads active
with expensive operations */
mal_export stream *maleventstream;
#ifdef HAVE_HGE
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -249,6 +249,7 @@ MCinitClientRecord(Client c, oid user, b
c->memorylimit = 0;
c->querytimeout = 0;
c->sessiontimeout = 0;
+ c->workers = c->memory = 0;
c->itrace = 0;
c->errbuf = 0;
@@ -372,6 +373,8 @@ MCforkClient(Client father)
son->memorylimit = father->memorylimit;
son->querytimeout = father->querytimeout;
son->sessiontimeout = father->sessiontimeout;
+ son->workers = father->workers;
+ son->memory = father->memory;
if (son->prompt)
GDKfree(son->prompt);
@@ -441,6 +444,7 @@ MCfreeClient(Client c)
c->memorylimit = 0;
c->querytimeout = 0;
c->sessiontimeout = 0;
+ c->workers = c->memory = 0;
c->user = oid_nil;
if( c->username){
GDKfree(c->username);
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -78,7 +78,9 @@ typedef struct CLIENT {
/*
* For program debugging and performance trace we keep the actual
resource claims.
*/
- time_t lastcmd; /* set when query is received */
+ time_t lastcmd; /* set when query is received */
+ int workers; /* Actual number of concurrent workers
*/
+ int memory; /* Actual memory claim highwater mark */
/* The user can request a TRACE SQL statement, calling for collecting
the events locally */
BAT *profticks;
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -187,7 +187,6 @@ q_enqueue(Queue *q, FlowEvent d)
* that want to use a big recent result
*/
-#ifdef USE_MAL_ADMISSION
static void
q_requeue_(Queue *q, FlowEvent d)
{
@@ -216,7 +215,6 @@ q_requeue(Queue *q, FlowEvent d)
MT_lock_unset(&q->l);
MT_sema_up(&q->s);
}
-#endif
static FlowEvent
q_dequeue(Queue *q, Client cntxt)
@@ -382,8 +380,7 @@ DFLOWworker(void *T)
continue;
}
-#ifdef USE_MAL_ADMISSION
- if (MALrunningThreads() > 2 && MALadmission(fe->argclaim,
fe->hotclaim)) {
+ if (MALrunningThreads() > 2 && MALadmission(cntxt,
fe->argclaim, fe->hotclaim)) {
// never block on deblockdataflow()
p= getInstrPtr(flow->mb,fe->pc);
if( p->fcn != (MALfcn) deblockdataflow){
@@ -395,14 +392,11 @@ DFLOWworker(void *T)
continue;
}
}
-#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= "
LLFMT "," LLFMT "," LLFMT " %s\n",
fe->pc, id, fe->argclaim,
fe->hotclaim, fe->maxclaim, error ? error : "");
-#ifdef USE_MAL_ADMISSION
/* release the memory claim */
- MALadmission(-fe->argclaim, -fe->hotclaim);
-#endif
+ MALadmission(cntxt, -fe->argclaim, -fe->hotclaim);
/* update the numa information. keep the thread-id producing
the value */
p= getInstrPtr(flow->mb,fe->pc);
for( i = 0; i < p->argc; i++)
@@ -428,7 +422,6 @@ DFLOWworker(void *T)
* because we hold the logical lock.
* All eligible instructions are queued
*/
-#ifdef USE_MAL_ADMISSION
{
InstrPtr p = getInstrPtr(flow->mb, fe->pc);
assert(p);
@@ -442,7 +435,6 @@ DFLOWworker(void *T)
if( footprint > fe->maxclaim) fe->maxclaim = footprint;
}
}
-#endif
MT_lock_set(&flow->flowlock);
for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
@@ -676,9 +668,6 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
fprintf(stderr, "\n");
}
}
-#ifdef USE_MAL_ADMISSION
- memorypool = memoryclaims = 0;
-#endif
return MAL_SUCCEED;
}
@@ -711,10 +700,8 @@ DFLOWscheduler(DataFlow flow, struct wor
{
int last;
int i;
-#ifdef USE_MAL_ADMISSION
int j;
InstrPtr p;
-#endif
int tasks=0, actions;
str ret = MAL_SUCCEED;
FlowEvent fe, f = 0;
@@ -730,7 +717,6 @@ DFLOWscheduler(DataFlow flow, struct wor
MT_lock_set(&flow->flowlock);
for (i = 0; i < actions; i++)
if (fe[i].blocks == 0) {
-#ifdef USE_MAL_ADMISSION
p = getInstrPtr(flow->mb,fe[i].pc);
if (p == NULL) {
MT_lock_unset(&flow->flowlock);
@@ -738,7 +724,6 @@ DFLOWscheduler(DataFlow flow, struct wor
}
for (j = p->retc; j < p->argc; j++)
fe[i].argclaim = getMemoryClaim(fe[0].flow->mb,
fe[0].flow->stk, p, j, FALSE);
-#endif
q_enqueue(todo, flow->status + i);
flow->status[i].state = DFLOWrunning;
PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT
"\n", flow->status[i].pc, flow->status[i].argclaim);
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -13,8 +13,8 @@
#include "mal_private.h"
/* MEMORY admission does not seem to have a major impact */
-lng memorypool = 0; /* memory claimed by concurrent threads */
-int memoryclaims = 0; /* number of threads active with expensive operations
*/
+static lng memorypool = 0; /* memory claimed by concurrent threads */
+static int memoryclaims = 0;
void
mal_resource_reset(void)
@@ -88,53 +88,45 @@ getMemoryClaim(MalBlkPtr mb, MalStkPtr s
}
/*
- * A consequence of multiple threads is that they may claim more
- * space than available. This may cause GDKmalloc to fail.
- * In many cases this situation will be temporary, because
- * threads will ultimately release resources.
- * Therefore, we wait for it.
- *
- * Alternatively, a front-end can set the flow administration
- * program counter to -1, which leads to a soft abort.
- * [UNFORTUNATELY this approach does not (yet) work
- * because there seem to a possibility of a deadlock
- * between incref and bbptrim. Furthermore, we have
- * to be assured that the partial executed instruction
- * does not lead to ref-count errors.]
+ * The hotclaim indicates the amount of data recentely written as a result of
an operation.
+ * The argclaim provides a hint on how much we actually may need to execute
+ * The argclaim is the sum over the hotclaims for all arguments.
+ * The hotclaim is a hint on how large the result could be.
*
- * The worker produces a result which will potentially unblock
- * instructions. This it can find itself without the help of the scheduler
- * and without the need for a lock. (does it?, parallel workers?)
- * It could also give preference to an instruction that eats away the object
- * just produced. THis way it need not be saved on disk for a long time.
+ * The client context also keeps bounds on the memory claim/client.
+ * Surpassing this bound may be a reason to not admit the instruction to
proceed.
*/
-/*
- * The hotclaim indicates the amount of data recentely written.
- * as a result of an operation. The argclaim is the sum over the hotclaims
- * for all arguments.
- * The argclaim provides a hint on how much we actually may need to execute
- * The hotclaim is a hint how large the result would be.
- */
-#ifdef USE_MAL_ADMISSION
static MT_Lock admissionLock = MT_LOCK_INITIALIZER("admissionLock");
+ATOMIC_TYPE mal_running = ATOMIC_VAR_INIT(0);
/* experiments on sf-100 on small machine showed no real improvement */
+
int
-MALadmission(lng argclaim, lng hotclaim)
+MALadmission(Client cntxt, lng argclaim, lng hotclaim)
{
+ (void) cntxt;
/* optimistically set memory */
if (argclaim == 0)
return 0;
MT_lock_set(&admissionLock);
- if (memoryclaims < 0)
+ /* Check if we are allowed a spawn another thread for this client */
+ /* It is somewhat tricky, because we may be in a recursion, each of
which is counted for.
+ * The MAL interpreter should trim the worker thread count as soon as
we call a FUNCTION/PATTERN recursively
+ */
+ if( cntxt->workerlimit && cntxt->workerlimit <= cntxt->workers){
+ PARDEBUG
+ fprintf(stderr, "#DFLOWadmit check workers %d <= %d\n",
cntxt->workerlimit, cntxt->workers);
+ }
+ /* Determine if the total memory resource is exhausted */
+ if ( memoryclaims < 0)
memoryclaims = 0;
- if (memorypool <= 0 && memoryclaims == 0)
+ if ( memorypool <= 0 && memoryclaims == 0)
memorypool = (lng)(MEMORY_THRESHOLD );
if (argclaim > 0) {
- if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
- memorypool -= (argclaim + hotclaim);
+ if ( memoryclaims == 0 || memorypool > argclaim + hotclaim) {
+ memorypool -= (lng) (argclaim + hotclaim);
memoryclaims++;
PARDEBUG
fprintf(stderr, "#DFLOWadmit %3d thread %d pool " LLFMT
"claims " LLFMT "," LLFMT "\n",
@@ -143,12 +135,13 @@ MALadmission(lng argclaim, lng hotclaim)
return 0;
}
PARDEBUG
- fprintf(stderr, "#Delayed due to lack of memory " LLFMT "
requested " LLFMT " memoryclaims %d\n", memorypool, argclaim + hotclaim,
memoryclaims);
+ fprintf(stderr, "#Delayed due to lack of memory " LLFMT "
requested " LLFMT " memoryclaims %d\n",
+ memorypool, argclaim + hotclaim, memoryclaims);
MT_lock_unset(&admissionLock);
return -1;
}
/* release memory claimed before */
- memorypool += -argclaim - hotclaim;
+ memorypool += (lng) (-argclaim - hotclaim);
memoryclaims--;
PARDEBUG
fprintf(stderr, "#DFLOWadmit %3d thread %d pool " LLFMT " claims "
LLFMT "," LLFMT "\n",
@@ -156,19 +149,17 @@ MALadmission(lng argclaim, lng hotclaim)
MT_lock_unset(&admissionLock);
return 0;
}
-#endif
-/* Delay a thread if too much competition arises and memory becomes a scarce
resource.
- * If in the mean time memory becomes free, or too many sleeping re-enable
worker.
- * It may happen that all threads enter the wait state. So, keep one running
at all time
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list