Changeset: f8f5217616a3 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f8f5217616a3
Modified Files:
monetdb5/mal/mal_interpreter.mx
Branch: default
Log Message:
Simplifying the scheduler
The administration of memory claims is simplified. It takes shape
at the end of each finished instruction.
However, when we run in full throttle, e.g. DFLOWadmission always holds,
then both sf-10 and sf-100 run good.
Also noticed that the serialisation condition recently added explains
for a large extend the performance degradation.
diffs (truncated from 536 to 300 lines):
diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -64,7 +64,6 @@
mal_export void garbageCollector(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
int flag);
mal_export void releaseBAT(MalBlkPtr mb, MalStkPtr stk, int bid);
mal_export lng getVolume(MalStkPtr stk, InstrPtr pci, int rd);
-mal_export void getMemoryClaim(MalStkPtr stk, InstrPtr pci, lng *argclaim, lng
*retclaim, lng *hotclaim);
mal_export ptr getArgReference(MalStkPtr stk, InstrPtr pci, int k);
@@ -608,7 +607,8 @@
int blocks; /* awaiting for variables */
sht state; /* of execution */
sht cost;
- lng claim; /* memory foot print */
+ lng hotclaim; /* memory foot print of result variables */
+ lng argclaim; /* memory foot print of arguments */
str error;
} *FlowStatus, FlowStatusRec;
@@ -693,13 +693,44 @@
if ( (b->ttype == TYPE_oid && !BATtordered(b) && !BATtdense(b)) ||
( b->htype == TYPE_oid && !BAThordered(b) && !BAThdense(b)) ){
/* assume we may have to do random IO, punish it by increasing
the claim with hash elem size*/
- total += 4 * BATcount(b) ;
+ total += 4 * cnt;
total = total >(lng) (MEMORY_THRESHOLD * monet_memory)? (lng)
(MEMORY_THRESHOLD * monet_memory):total;
}
BBPunfix( h = b->batCacheid);
}
@c
static lng
+getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i){
+ lng total=0, vol = 0;
+ BAT *b;
+ BUN cnt = 0;
+
+ (void) mb;
+ if (stk->stk[getArg(pci,i)].vtype == TYPE_bat){
+ b = BATdescriptor(stk->stk[getArg(pci,i)].val.bval);
+ if (b==NULL)
+ return 0;
+ @:calcFootprint@
+ /* prepare for hashes */
+ if ( b->H->hash == NULL && b->hsorted == 0 &&
+ b->T->hash == NULL && b->tsorted == 0 ){
+ if ( BATcount(b) > cnt)
+ cnt = BATcount(b);
+ }
+ if ( (b->ttype == TYPE_oid && !BATtordered(b) && !BATtdense(b))
||
+ ( b->htype == TYPE_oid && !BAThordered(b) && !BAThdense(b))
){
+ /* assume we may have to do random IO, punish it by
increasing the claim with hash elem size*/
+ total += 4 * cnt;
+ total = total >(lng) (MEMORY_THRESHOLD * monet_memory)?
(lng) (MEMORY_THRESHOLD * monet_memory):total;
+ }
+ BBPunfix( b->batCacheid);
+ }
+ PARDEBUG
+ mnstr_printf(GDKstdout,"#memory claim " LLFMT "\n",total);
+ return total;
+}
+
+static lng
getHotClaim()
{
lng total = 0;
@@ -714,42 +745,19 @@
hot potatoes it will eat.
@c
void
-getMemoryClaim(MalStkPtr stk, InstrPtr pci, lng *argclaim, lng *retclaim, lng
*hotclaim)
+getMemoryHotClaim(FlowStatus fs)
{
- int i, h = int_nil;
- lng guess=0, t=0, total = 0, vol=0;
- BUN cnt = 0;
- BAT *b;
+ int i, h, t;
+ InstrPtr pci = getInstrPtr(fs->mb, fs->pc);
- total = 0;
- if (retclaim)
- *retclaim =0;
- if (hotclaim)
- *hotclaim =0;
- if (argclaim){
- for(i= pci->retc; i< pci->argc; i++) {
- h = int_nil;
- @:calcclaim@
- if ( guess == 0)
- guess = total -t;
- /* claims on hotpotatoes are not counted */
- if (hotclaim)
- for (t =0; t< hottop; t++)
- if (hotpotatoes[t].bid == h)
- *hotclaim += hotpotatoes[t].claim;
- }
- /* make sure you can afford one hash when you have propCheck
enabled */
- if ( GDKdebug & 10 ){
- total += cnt * sizeof(BUN);
- }
- *argclaim = total;
+ fs->hotclaim = 0;
+ for(i= pci->retc; i< pci->argc; i++) {
+ if (fs->stk->stk[getArg(pci,i)].vtype == TYPE_bat)
+ h = fs->stk->stk[getArg(pci,i)].val.bval;
+ for (t =0; t< hottop; t++)
+ if (hotpotatoes[t].bid == h)
+ fs->hotclaim += hotpotatoes[t].claim;
}
- if ( retclaim && *retclaim == 0)
- *retclaim = guess * pci->retc;
-#ifdef DEBUG_MEMORY_CLAIM
- if ( total )
- mnstr_printf(GDKout,"#DFLOWgetMemoryClaim pool " LLFMT "
claims " LLFMT "," LLFMT "," LLFMT"\n",
memorypool,total,(retclaim?*retclaim:0),(hotclaim?*hotclaim:0));
-#endif
}
@-
After we have executed the instruction, we should release any hotpotatoe claim
(it is hot only once)
@@ -851,12 +859,13 @@
}
int
-DFLOWadmission(lng argclaim, lng retclaim, lng hotclaim)
+DFLOWadmission(lng argclaim, lng hotclaim)
{
lng hot;
/* optimistically set memory */
- if ( argclaim + retclaim == 0)
+ if ( argclaim == 0)
return 0;
+ return 0;
mal_set_lock(mal_contextLock, "DFLOWdelay");
if (memorypool <= 0 && memoryclaims == 0) {
@@ -864,23 +873,21 @@
hottop = 0;
}
- if ( argclaim + retclaim > 0 ) {
- if (memoryclaims == 0 || memorypool - memoryused > argclaim +
retclaim -hotclaim){
+ if ( argclaim > 0 ) {
+ if (memoryclaims == 0 || memorypool - memoryused > argclaim
-hotclaim){
hot = getHotClaim();
- if ( memoryclaims && hotclaim == 0 && argclaim +
retclaim > memorypool - memoryused - hot) {
+ if ( memoryclaims && hotclaim == 0 && argclaim >
memorypool - memoryused - hot) {
/* don't start unless you can eat the hot
potatoes first */
-#ifdef DEBUG_MEMORY_CLAIM
- mnstr_printf(GDKerr,"#Delayed due to hot
potatoes pool " LLFMT " used " LLFMT " hot " LLFMT "\n", memorypool,
memoryused,hot);
-#endif
+ PARDEBUG
+ mnstr_printf(GDKstdout,"#Delayed due to
hot potatoes pool " LLFMT " used " LLFMT " hot " LLFMT "\n", memorypool,
memoryused,hot);
mal_unset_lock(mal_contextLock, "DFLOWdelay");
return -1;
}
- memorypool -= (argclaim + retclaim);
+ memorypool -= argclaim;
memoryclaims ++;
-#ifdef DEBUG_MEMORY_CLAIM
- mnstr_printf(GDKout,"#DFLOWadmit %3d thread %d pool "
LLFMT","LLFMT " claims " LLFMT "," LLFMT"," LLFMT"\n",
- memoryclaims, THRgettid(), memorypool,
memoryused, argclaim, retclaim,hotclaim);
-#endif
+ PARDEBUG
+ mnstr_printf(GDKstdout,"#DFLOWadmit %3d thread
%d pool " LLFMT","LLFMT " claims " LLFMT "," LLFMT"\n",
+ memoryclaims, THRgettid(), memorypool,
memoryused, argclaim, hotclaim);
mal_unset_lock(mal_contextLock, "DFLOWdelay");
return 0;
}
@@ -888,13 +895,11 @@
return -1;
}
/* release memory claimed before */
- memorypool += -argclaim -retclaim ;
+ memorypool += -argclaim ;
memoryclaims --;
-#ifdef DEBUG_MEMORY_CLAIM
- mnstr_printf(GDKout,"#DFLOWadmit %3d thread %d pool "
LLFMT","LLFMT " claims " LLFMT "," LLFMT"," LLFMT"\n",
- memoryclaims, THRgettid(), memorypool, memoryused,
argclaim, retclaim,hotclaim);
- mnstr_flush(GDKout);
-#endif
+ PARDEBUG
+ mnstr_printf(GDKstdout,"#DFLOWadmit %3d thread %d pool "
LLFMT","LLFMT " claims " LLFMT "," LLFMT"\n",
+ memoryclaims, THRgettid(), memorypool, memoryused,
argclaim, hotclaim);
assert(memoryclaims >= 0 );
mal_unset_lock(mal_contextLock, "DFLOWdelay");
return 0;
@@ -935,17 +940,23 @@
}
*/
-/* keep a simple FIFO queue. It won't be a large one, so shuffles of requeue
is possible */
+/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue
is possible */
+/* we might actually sort it for better scheduling behavior */
static void
-q_enqueue(queue *q, void *d)
+q_enqueue_(queue *q, FlowStatus d)
{
- MT_lock_set(&q->l, "q_enqueue");
if (q->last == q->size) {
/* enlarge buffer */
q->size <<= 1;
q->data = GDKrealloc(q->data, sizeof(void*)*q->size);
}
- q->data[q->last++] = d;
+ q->data[q->last++] = (void*) d;
+}
+static void
+q_enqueue(queue *q, FlowStatus d)
+{
+ MT_lock_set(&q->l, "q_enqueue");
+ q_enqueue_(q, d);
MT_lock_unset(&q->l, "q_enqueue");
MT_sema_up(&q->s, "q_enqueue");
}
@@ -962,7 +973,7 @@
}
for ( i=q->last; i > 0; i--)
q->data[i]= q->data[i-1];
- q->data[0] = d;
+ q->data[0] = (void*) d;
q->last++;
MT_lock_unset(&q->l, "q_requeue");
MT_sema_up(&q->s, "q_requeue");
@@ -988,6 +999,18 @@
Beware, we assume that variables are assigned a value once, otherwise
the order may really create errors.
The order of the instructions should be retained as long as possible.
+@-
+Delay processing when we run out of memory. Push the instruction back
+on the end of queue, waiting for another attempt. Problem might become
+that all threads but one are cycling through the queue, each time
+finding an eligible instruction, but without enough space.
+Therefore, we wait for a few milliseconds as an initial punishment.
+
+The process could be refined by checking for cheap operations,
+i.e. those that would require no memory at all (aggr.count)
+This, however, would lead to a dependency to the upper layers,
+because in the kernel we don't know what routines are available
+with this property. Nor do we maintain such properties.
@c
static str
DFLOWstep(FlowTask *t, FlowStatus fs)
@@ -1014,7 +1037,6 @@
struct Mallinfo oldMemory;
MT_Lock *lock = &flow->done->l;
int tid = t->id, prevpc= 0;
- lng argclaim,retclaim,hotclaim;
#ifdef HAVE_SYS_RESOURCE_H
int oldinblock=0;
@@ -1065,63 +1087,17 @@
@:beginProfile(t,0)@
ret = MAL_SUCCEED;
@:MALrecycleStart(t)@ {
-@-
-Delay processing when we run out of memory. Push the instruction back
-on the end of queue, waiting for another attempt. Problem might become
-that all threads but one are cycling through the queue, each time
-finding an eligible instruction, but without enough space.
-Therefore, we wait for a few milliseconds as an initial punishment.
-
-The process could be refined by checking for cheap operations,
-i.e. those that would require no memory at all (aggr.count)
-This, however, would lead to a dependency to the upper layers,
-because in the kernel we don't know what routines are available
-with this property. Nor do we maintain such properties.
-@c
- getMemoryClaim(stk, pci, &argclaim, &retclaim,&hotclaim);
- if ( DFLOWadmission(argclaim,retclaim,hotclaim) ){
- PARDEBUG {
- mnstr_printf(GDKout,"#DFLOWdelay instruction %d
pool " LLFMT " claim "LLFMT"\n#", THRgettid(), memorypool, argclaim+retclaim);
- printInstruction(GDKstdout, mb, 0, pci,
LIST_MAL_STMT | LIST_MAPI);
- }
- for ( ; ; )
- {
- MT_sleep_ms(3); /* enough to pass some cheap
instructions */
- if ( DFLOWadmission(argclaim,retclaim,hotclaim)
== 0) {
- PARDEBUG {
- mnstr_printf(GDKout,"#DFLOWcont
thread %d delay " LLFMT " pool " LLFMT" delayed " LLFMT " ms\n#",
THRgettid(), memorypool, argclaim+retclaim, (GDKusec()-stk->clk)/1000);
- printInstruction(GDKstdout, mb,
0, pci, LIST_MAL_STMT | LIST_MAPI);
- }
- break;
- }
- /* if we claim only part of memory, then let's
wait */
- if (argclaim + retclaim -hotclaim < (lng)
(monet_memory / (memoryclaims+1)))
- continue;
- /* re-schedule instructions that require a lot
of memory for postponed execution */
- PARDEBUG {
- mnstr_printf(GDKout,"#DFLOWrequeue
instr %d pool " LLFMT " claim "LLFMT"\n#", THRgettid(), memorypool,
argclaim+retclaim);
- printInstruction(GDKstdout, mb, 0, pci,
LIST_MAL_STMT | LIST_MAPI);
- }
- /* actually should use DFLOWretry status */
- throw(MAL,"DFLOWadmission","failed");
- }
- }
- /* remove all result hotpotatoes now */
- updMemoryUsedPart(stk,pci,0,pci->retc,-1);
@:beginProfile(t,1)@
@-
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list