Changeset: c7014011aa01 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c7014011aa01
Added Files:
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
Modified Files:
        monetdb5/mal/Makefile.ag
        monetdb5/mal/mal.h
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_instruction.c
        monetdb5/mal/mal_instruction.h
        monetdb5/mal/mal_interpreter.c
Branch: default
Log Message:

Multiuser resource management
Under multi-query load there comes a point that we run out of
memory resources. Then the interpreter threads are being
put to sleep, waiting for the rss memory level to drop sufficiently.
Any MALblock that runs for more then 2 seconds becomes a subject
to this policy.
The most common situation is in parallel processing a single query.
Then workers are delayed to give the system room to reduce the active
memory footprint.


diffs (truncated from 654 to 300 lines):

diff --git a/monetdb5/mal/Makefile.ag b/monetdb5/mal/Makefile.ag
--- a/monetdb5/mal/Makefile.ag
+++ b/monetdb5/mal/Makefile.ag
@@ -40,6 +40,7 @@ lib_mal = {
                mal_import.c mal_import.h \
                mal_runtime.c mal_runtime.h \
                mal_instruction.c mal_instruction.h \
+               mal_resource.c mal_resource.h \
                mal_interpreter.c mal_interpreter.h \
                mal_dataflow.c mal_dataflow.h \
                mal_linker.c mal_linker.h \
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -46,7 +46,6 @@
  */
 #define MAXSCRIPT 64
 #define MEMORY_THRESHOLD  0.8
-#define DELAYUNIT 100 /* ms delay in parallel processing decissions */
 
 mal_export char     monet_cwd[PATHLENGTH];
 mal_export size_t      monet_memory;
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -84,10 +84,6 @@ typedef struct DATAFLOW {
 static MT_Id workers[THREADS];
 static queue *todo = 0;        /* pending instructions */
 
-/* does not seem to have a major impact */
-lng memorypool = 0;      /* memory claimed by concurrent threads */
-int memoryclaims = 0;    /* number of threads active with expensive operations 
*/
-
 /*
  * Calculate the size of the dataflow dependency graph.
  */
@@ -103,123 +99,6 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
 }
 
 /*
- * Running all eligible instructions in parallel creates
- * resource contention. This means we should implement
- * an admission control scheme where threads are temporarily
- * postponed if the claim for memory exceeds a threshold
- * In general such contentions will be hard to predict,
- * because they depend on the algorithm, the input sizes,
- * concurrent use of the same variables, and the output produced.
- *
- * The heuristic is based on calculating the storage footprint
- * of the operands and assuming it preferrably should fit in memory.
- * Ofcourse, there may be intermediate structures being
- * used and the size of the result is not a priori known.
- * For this, we use a high watermark on the amount of
- * physical memory we pre-allocate for the claims.
- *
- * Instructions are eligible to be executed when the
- * total footprint of all concurrent executions stays below
- * the high-watermark or it is the single expensive
- * instruction being started.
- *
- * When we run out of memory, the instruction is delayed.
- * How long depends on the other instructions to free up
- * resources. The current policy simple takes a local
- * decision by delaying the instruction based on its
- * past and the size of the memory pool size.
- * The waiting penalty decreases with each step to ensure
- * it will ultimately taken into execution, with possibly
- * all resource contention effects.
- *
- * Another option would be to maintain a priority queue of
- * suspended instructions.
- */
-
-/*
- * The memory claim is the estimate for the amount of memory hold.
- * Views are consider cheap and ignored
- */
-lng
-getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, int pc, int i, int flag)
-{
-       lng total = 0, vol = 0;
-       BAT *b;
-       InstrPtr pci = getInstrPtr(mb,pc);
-
-       (void)mb;
-       if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
-               b = BATdescriptor(stk->stk[getArg(pci, i)].val.bval);
-               if (b == NULL)
-                       return 0;
-               if (flag && isVIEW(b)) {
-                       BBPunfix(b->batCacheid);
-                       return 0;
-               }
-               heapinfo(&b->H->heap); total += vol;
-               heapinfo(b->H->vheap); total += vol;
-               hashinfo(b->H->hash); total += vol;
-
-               heapinfo(&b->T->heap); total += vol;
-               heapinfo(b->T->vheap); total += vol;
-               hashinfo(b->T->hash); total += vol;
-               if ( b->T->hash == 0  || b->H->hash ==0)        /* assume one 
hash claim */
-                       total+= BATcount(b) * sizeof(lng);
-               total = total > (lng)(MEMORY_THRESHOLD * monet_memory) ? 
(lng)(MEMORY_THRESHOLD * monet_memory) : total;
-               BBPunfix(b->batCacheid);
-       }
-       return total;
-}
-
-/*
- * The hotclaim indicates the amount of data recentely written.
- * as a result of an operation. The argclaim is the sum over the hotclaims
- * for all arguments.
- * The argclaim provides a hint on how much we actually may need to execute
- * The hotclaim is a hint how large the result would be.
- */
-#ifdef USE_DFLOW_ADMISSION
-/* experiments on sf-100 on small machine showed no real improvement */
-int
-DFLOWadmission(lng argclaim, lng hotclaim)
-{
-       /* optimistically set memory */
-       if (argclaim == 0)
-               return 0;
-
-       MT_lock_set(&mal_contextLock, "DFLOWdelay");
-       if (memoryclaims < 0)
-               memoryclaims = 0;
-       if (memorypool <= 0 && memoryclaims == 0)
-               memorypool = (lng)(MEMORY_THRESHOLD * monet_memory);
-
-       if (argclaim > 0) {
-               if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
-                       memorypool -= (argclaim + hotclaim);
-                       memoryclaims++;
-                       PARDEBUG
-                       mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool 
" LLFMT "claims " LLFMT "," LLFMT "\n",
-                                                memoryclaims, THRgettid(), 
memorypool, argclaim, hotclaim);
-                       MT_lock_unset(&mal_contextLock, "DFLOWdelay");
-                       return 0;
-               }
-               PARDEBUG
-               mnstr_printf(GDKstdout, "#Delayed due to lack of memory " LLFMT 
" requested " LLFMT " memoryclaims %d\n", memorypool, argclaim + hotclaim, 
memoryclaims);
-               MT_lock_unset(&mal_contextLock, "DFLOWdelay");
-               return -1;
-       }
-       /* release memory claimed before */
-       memorypool += -argclaim - hotclaim;
-       memoryclaims--;
-       PARDEBUG
-       mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool " LLFMT " 
claims " LLFMT "," LLFMT "\n",
-                                memoryclaims, THRgettid(), memorypool, 
argclaim, hotclaim);
-       MT_lock_unset(&mal_contextLock, "DFLOWdelay");
-       return 0;
-}
-#endif
-
-/*
  * The dataflow execution is confined to a barrier block.
  * Within the block there are multiple flows, which, in principle,
  * can be executed in parallel.
@@ -345,75 +224,6 @@ q_dequeue(queue *q)
  * with this property. Nor do we maintain such properties.
  */
 
-/*
- * A consequence of multiple threads is that they may claim more
- * space than available. This may cause GDKmalloc to fail.
- * In many cases this situation will be temporary, because
- * threads will ultimately release resources.
- * Therefore, we wait for it.
- *
- * Alternatively, a front-end can set the flow administration
- * program counter to -1, which leads to a soft abort.
- * [UNFORTUNATELY this approach does not (yet) work
- * because there seem to a possibility of a deadlock
- * between incref and bbptrim. Furthermore, we have
- * to be assured that the partial executed instruction
- * does not lead to ref-count errors.]
- *
- * The worker produces a result which will potentially unblock
- * instructions. This it can find itself without the help of the scheduler
- * and without the need for a lock. (does it?, parallel workers?)
- * It could also give preference to an instruction that eats away the object
- * just produced. THis way it need not be saved on disk for a long time.
- */
-static int asleep = 0;
-
-/* Thread chain context switch decision and multi-user balancing. */
-/* Delay the threads if too much competition arises and memory
- * becomes scarce */
-/* If in the mean time memory becomes free, or too many sleep,
- * re-enable worker */
-/* It may happen that all threads enter the wait state. So, keep
- * one running at all time */
-/* this routine can be extended to manage multiple query streams.
- * By keeping the query start time in the client record we can delay
- * them when resource stress occurs.
- */
-static void
-MALresourceFairness(Client cntxt, lng usec)
-{
-       long rss = MT_getrss();
-       long delay, clk = (GDKusec() - usec) / 1000;
-       double factor = 1.0;
-       if (rss > MEMORY_THRESHOLD * monet_memory && clk > DELAYUNIT && 
todo->last) {
-               MT_lock_set(&mal_delayLock, "runMALdataflow");
-               asleep++;
-               MT_lock_unset(&mal_delayLock, "runMALdataflow");
-
-               PARDEBUG mnstr_printf(GDKstdout, "#delay %d initial %ld\n", 
cntxt->idx, clk);
-               while (clk > 0) {
-                       /* always keep two running to avoid all waiting for
-                        * a chain context switch */
-                       if (asleep >= GDKnr_threads - 1)
-                               break;
-                       /* speed up wake up when we have memory or too many 
sleepers */
-                       rss = MT_getrss();
-                       if (rss < MEMORY_THRESHOLD * monet_memory)
-                               break;
-                       factor = ((double) rss) / (MEMORY_THRESHOLD * 
monet_memory);
-                       delay = (long) (DELAYUNIT * (factor > 1.0 ? 1.0 : 
factor));
-                       delay = (long) (delay * (1.0 - (asleep - 1) / 
GDKnr_threads));
-                       if (delay)
-                               MT_sleep_ms(delay);
-                       clk -= DELAYUNIT;
-               }
-               MT_lock_set(&mal_delayLock, "runMALdataflow");
-               asleep--;
-               MT_lock_unset(&mal_delayLock, "runMALdataflow");
-               PARDEBUG mnstr_printf(GDKstdout, "#delayed finished thread %d 
asleep %d\n", cntxt->idx, asleep);
-       }
-}
-
 static void
 DFLOWworker(void *t)
 {
@@ -442,8 +252,8 @@ DFLOWworker(void *t)
                        continue;
                }
 
-#ifdef USE_DFLOW_ADMISSION
-               if (DFLOWadmission(fe->argclaim, fe->hotclaim)) {
+#ifdef USE_MAL_ADMISSION
+               if (MALadmission(fe->argclaim, fe->hotclaim)) {
                        fe->hotclaim = 0;   /* don't assume priority anymore */
                        if (todo->last == 0)
                                MT_sleep_ms(DELAYUNIT);
@@ -481,9 +291,9 @@ DFLOWworker(void *t)
 
                PARDEBUG mnstr_printf(GDKstdout, "#execute pc= %d wrk= %d 
finished %s\n", fe->pc, id, flow->error ? flow->error : "");
 
-#ifdef USE_DFLOW_ADMISSION
+#ifdef USE_MAL_ADMISSION
                /* release the memory claim */
-               DFLOWadmission(-fe->argclaim, -fe->hotclaim);
+               MALadmission(-fe->argclaim, -fe->hotclaim);
 #endif
 
                /* see if you can find an eligible instruction that uses the
@@ -492,14 +302,14 @@ DFLOWworker(void *t)
                 * are safe from concurrent actions.
                 * All eligible instructions are queued
                 */
-#ifdef USE_DFLOW_ADMISSION
+#ifdef USE_MAL_ADMISSION
                fe->hotclaim = 0;
                p = getInstrPtr(flow->mb, fe->pc);
                for (i = 0; i < p->retc; i++)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
 #endif
                q_enqueue(flow->done, fe);
-               MALresourceFairness(flow->cntxt, usec);
+               MALresourceFairness(flow->cntxt, flow->mb, usec);
        }
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
@@ -623,7 +433,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                }
                mnstr_printf(GDKstdout, "\n");
        }
-#ifdef USE_DFLOW_ADMISSION
+#ifdef USE_MAL_ADMISSION
        memorypool = memoryclaims = 0;
 #endif
 }
@@ -655,7 +465,7 @@ DFLOWscheduler(DataFlow flow)
 {
        int queued = 0, last;
        int i, pc = 0;
-#ifdef USE_DFLOW_ADMISSION
+#ifdef USE_MAL_ADMISSION
        int j;
        InstrPtr p;
 #endif
@@ -674,7 +484,7 @@ DFLOWscheduler(DataFlow flow)
        /* enter all dependencies before releasing the queue  */
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
-#ifdef USE_DFLOW_ADMISSION
+#ifdef USE_MAL_ADMISSION
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to