Changeset: c1b5a6530528 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c1b5a6530528
Modified Files:
monetdb5/mal/mal_interpreter.c
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
monetdb5/mal/mal_runtime.c
Branch: default
Log Message:
Improve multi-user query scheduling
After 3 minutes of processing, a query becomes subject
to fairness scheduling, where other queries can proceed
a little faster.
diffs (227 lines):
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -823,8 +823,6 @@ str runMALsequence(Client cntxt, MalBlkP
/* If needed recycle intermediate result */
if (pci->recycle > 0)
RECYCLEexit(cntxt, mb, stk, pci,
&runtimeProfile);
- if ( cntxt->idx > 1 )
- MALresourceFairness(GDKusec()-
mb->starttime);
/* general garbage collection */
if (ret == MAL_SUCCEED && garbageControl(pci)) {
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -158,78 +158,77 @@ MALadmission(lng argclaim, lng hotclaim)
}
#endif
-/* Delay threads if too much competition arises and memory
- * becomes a scarce resource.
- * If in the mean time memory becomes free, or too many sleeping
- * re-enable worker.
- * It may happen that all threads enter the wait state. So, keep
- * one running at all time
+/* Delay threads if too much competition arises and memory becomes a scarce
resource.
+ * If in the mean time memory becomes free, or too many sleeping re-enable
worker.
+ * It may happen that all threads enter the wait state. So, keep one running
at all time
* By keeping the query start time in the client record we can delay
* them when resource stress occurs.
*/
-#include "gdk_atomic.h"
-static volatile ATOMIC_TYPE running;
+ATOMIC_TYPE mal_running;
#ifdef ATOMIC_LOCK
-static MT_Lock runningLock MT_LOCK_INITIALIZER("runningLock");
+MT_Lock mal_runningLock MT_LOCK_INITIALIZER("mal_runningLock");
#endif
void
MALresourceFairness(lng usec)
{
+#ifdef FAIRNESS_THRESHOLD
size_t rss;
- unsigned int delay;
lng clk;
- int threads;
int delayed= 0;
+ int users = 2;
+
+ if ( usec <= TIMESLICE)
+ return;
/* use GDKmem_cursize as MT_getrss() is too expensive */
rss = GDKmem_cursize();
/* ample of memory available*/
- if ( rss < MEMORY_THRESHOLD && usec <= TIMESLICE)
+ if ( rss <= MEMORY_THRESHOLD )
return;
/* worker reporting time spent in usec! */
clk = usec / 1000;
- if ( clk > DELAYUNIT ) {
- PARDEBUG mnstr_printf(GDKstdout, "#delay initial "LLFMT"n",
clk);
- (void) ATOMIC_DEC(running, runningLock);
- /* always keep one running to avoid all waiting */
- while (clk > 0 && running >= 2 && delayed < MAX_DELAYS) {
- /* speed up wake up when we have memory */
- if (rss < MEMORY_THRESHOLD )
+ /* cap the maximum penalty */
+ clk = clk > FAIRNESS_THRESHOLD? FAIRNESS_THRESHOLD:clk;
+
+ /* always keep one running to avoid all waiting */
+ while (clk > DELAYUNIT && users > 1 && mal_running > (size_t)
GDKnr_threads && rss > MEMORY_THRESHOLD) {
+ if ( delayed++ == 0){
+ PARDEBUG mnstr_printf(GDKstdout, "#delay
initial ["LLFMT"] memory "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD );
+ PARDEBUG mnstr_flush(GDKstdout);
+ }
+ if ( delayed == MAX_DELAYS){
+ PARDEBUG mnstr_printf(GDKstdout, "#delay abort
["LLFMT"] memory "SZFMT"[%f]\n", clk, rss, MEMORY_THRESHOLD );
+ PARDEBUG mnstr_flush(GDKstdout);
break;
- threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
- delay = (unsigned int) ( ((double)DELAYUNIT * running)
/ threads) + 1;
- if (delay) {
- if ( delayed++ == 0){
- PARDEBUG
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n",
delay, clk, rss, MEMORY_THRESHOLD );
- PARDEBUG mnstr_flush(GDKstdout);
- }
- MT_sleep_ms(delay);
- rss = GDKmem_cursize();
- } else break;
- clk -= DELAYUNIT;
}
- (void) ATOMIC_INC(running, runningLock);
+ MT_sleep_ms(DELAYUNIT);
+ users= MCactiveClients(); // users excluding console
+ rss = GDKmem_cursize();
+ clk -= DELAYUNIT;
}
+#else
+(void) usec;
+#endif
}
// Get a hint on the parallel behavior
size_t
MALrunningThreads(void)
{
- return running;
+ return mal_running;
}
void
initResource(void)
{
#ifdef NEED_MT_LOCK_INIT
- ATOMIC_INIT(runningLock);
+ ATOMIC_INIT(mal_runningLock);
#ifdef USE_MAL_ADMISSION
MT_lock_init(&admissionLock, "admissionLock");
#endif
#endif
- running = (ATOMIC_TYPE) GDKnr_threads;
+ mal_running = (ATOMIC_TYPE) GDKnr_threads;
}
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -10,10 +10,15 @@
#define _MAL_RESOURCE_H
#include "mal_interpreter.h"
+#include "gdk_atomic.h"
+mal_export ATOMIC_TYPE mal_running;
+#ifdef ATOMIC_LOCK
+mal_export MT_Lock mal_runningLock;
+#endif
-#define TIMESLICE 2000000 /* usec */
+#define TIMESLICE (3 * 60 * 1000 * 1000) /* usec , 3 minute high priority */
#define DELAYUNIT 2 /* ms delay in parallel processing decisions */
-#define MAX_DELAYS 1000 /* never wait forever */
+#define MAX_DELAYS 1000 /* never wait more then 2000 ms */
//#define heapinfo(X,Id) (((X) && (X)->base && ((X)->parentid == 0 ||
(X)->parentid == Id)) ? (X)->free : 0)
#define heapinfo(X,Id) (((X) && (X)->base ) ? (X)->free : 0)
@@ -24,6 +29,8 @@
mal_export int MALadmission(lng argclaim, lng hotclaim);
#endif
+#define FAIRNESS_THRESHOLD MAX_DELAYS * DELAYUNIT
+
mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int
i, int flag);
mal_export void MALresourceFairness(lng usec);
mal_export size_t MALrunningThreads(void);
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -19,10 +19,9 @@
#include "mal_profiler.h"
#include "mal_listing.h"
#include "mal_authorize.h"
+#include "mal_resource.h"
#include "mal_private.h"
-#define heapinfo(X) ((X) && (X)->base ? (X)->free: 0)
-#define hashinfo(X) ( (X)? heapinfo((X)->heap):0)
// Keep a queue of running queries
QueryQueue QRYqueue;
@@ -155,6 +154,7 @@ runtimeProfileBegin(Client cntxt, MalBlk
{
int tid = THRgettid();
+ assert(pci);
/* keep track on the instructions taken in progress */
cntxt->active = TRUE;
if( tid < THREADS){
@@ -166,8 +166,12 @@ runtimeProfileBegin(Client cntxt, MalBlk
/* always collect the MAL instruction execution time */
gettimeofday(&pci->clock,NULL);
prof->ticks = GDKusec();
+
+ /* keep track of actual running instructions over BATs */
+ if( isaBatType(getArgType(mb, pci, 0)) )
+ (void) ATOMIC_INC(mal_running, mal_runningLock);
+
/* emit the instruction upon start as well */
-
if(malProfileMode > 0)
profilerEvent(mb, stk, pci, TRUE, cntxt->username);
}
@@ -185,6 +189,9 @@ runtimeProfileExit(Client cntxt, MalBlkP
}
assert(pci);
+ if( isaBatType(getArgType(mb, pci, 0)) )
+ (void) ATOMIC_DEC(mal_running, mal_runningLock);
+
assert(prof);
/* always collect the MAL instruction execution time */
pci->ticks = GDKusec() - prof->ticks;
@@ -203,6 +210,9 @@ runtimeProfileExit(Client cntxt, MalBlkP
malProfileMode = 1;
}
cntxt->active = FALSE;
+ /* reduce threads of non-admin long running transaction if needed */
+ if ( cntxt->idx > 1 )
+ MALresourceFairness(GDKusec()- mb->starttime);
}
/*
@@ -218,9 +228,9 @@ getBatSpace(BAT *b){
lng space=0;
if( b == NULL)
return 0;
- if( b->T) space += heapinfo(&b->T->heap);
- if( b->T->vheap) space += heapinfo(b->T->vheap);
- if(b->T) space += hashinfo(b->T->hash);
+ space += BATcount(b) * b->T->width;
+ if( b->T->vheap) space += heapinfo(b->T->vheap, abs(b->batCacheid));
+ if(b->T) space += hashinfo(b->T->hash, abs(b->batCacheid));
space += IMPSimprintsize(b);
return space;
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list