Changeset: 032cd11701b8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=032cd11701b8
Modified Files:
gdk/gdk_utils.c
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_instruction.c
monetdb5/modules/mal/batExtensions.c
monetdb5/optimizer/opt_multicore.c
monetdb5/scheduler/Makefile.ag
monetdb5/scheduler/mut_policy.c
monetdb5/scheduler/mut_stopRuns.c
monetdb5/scheduler/mut_transforms.c
monetdb5/scheduler/run_multicore.c
monetdb5/scheduler/run_multicore.h
Branch: mutation
Log Message:
macros added / corrected for join and select operators, cleaned up the code,
new algorithm heuristic tunings added
diffs (truncated from 873 to 300 lines):
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -919,12 +919,14 @@ GDKvmtrim(void *limit)
MEMDEBUG THRprintf(GDKstdout, "alloc = " SZFMT " %+zd rss = "
SZFMT " %+zd\n", cursize, memdiff, rss, rssdiff);
prevmem = cursize;
prevrss = rss;
- if (memdiff >= 0 && rssdiff < -32 * (ssize_t) MT_pagesize()) {
- BBPtrim(rss);
- highload = 1;
- } else {
- highload = 0;
- }
+
+ if (rss > 0.8 * (ssize_t) MT_npages() * MT_pagesize()) {
+ BBPtrim(0.2 * (ssize_t) MT_npages() * MT_pagesize());
+ highload = 1;
+ } else {
+ highload = 0;
+ }
+
} while (!GDKexiting());
}
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -231,7 +231,7 @@ static int stick_this_thread_to_core(int
pthread_t current_thread;
cpu_set_t cpuset;
- int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
+ int num_cores = MT_check_nr_cores();
if (core_id >= num_cores)
{
diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -1624,12 +1624,12 @@ void
delArgument(InstrPtr p, int idx)
{
int i;
-
for (i = idx; i < p->argc - 1; i++)
p->argv[i] = p->argv[i + 1];
p->argc--;
if (idx < p->retc)
p->retc--;
+
}
void
diff --git a/monetdb5/modules/mal/batExtensions.c
b/monetdb5/modules/mal/batExtensions.c
--- a/monetdb5/modules/mal/batExtensions.c
+++ b/monetdb5/modules/mal/batExtensions.c
@@ -262,7 +262,6 @@ CMDbatpartition2(Client cntxt, MalBlkPtr
hval = lval+step;
bn = BATslice(b, lval,hval);
BATseqbase(bn, lval + b->hseqbase) ;
-// BATseqbase(bn, lval) ;
if (bn== NULL){
BBPreleaseref(b->batCacheid);
throw(MAL, "bat.partition", INTERNAL_OBJ_CREATE);
diff --git a/monetdb5/optimizer/opt_multicore.c
b/monetdb5/optimizer/opt_multicore.c
--- a/monetdb5/optimizer/opt_multicore.c
+++ b/monetdb5/optimizer/opt_multicore.c
@@ -43,15 +43,20 @@ OPTmulticoreImplementation(Client cntxt,
limit= mb->stop;
slimit = mb->ssize;
- mb->vsize = 700;
- new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr));
- if (new == NULL) {
- GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL);
- return -1;
- }
- memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * mb->vtop);
- GDKfree(mb->var);
- mb->var = new;
+ if(mb->vsize < 700)
+ {
+ mb->vsize = 700;
+ new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr));
+ if (new == NULL)
+ {
+ // throw(MAL,"varPtrAlloc","Internal error");
+ GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL);
+ return -1;
+ }
+ memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) *
mb->vtop);
+ GDKfree(mb->var);
+ mb->var = new;
+ }
if ( newMalBlkStmt(mb, mb->ssize) < 0)
return 0;
diff --git a/monetdb5/scheduler/Makefile.ag b/monetdb5/scheduler/Makefile.ag
--- a/monetdb5/scheduler/Makefile.ag
+++ b/monetdb5/scheduler/Makefile.ag
@@ -47,4 +47,4 @@ headers_mal = {
}
EXTRA_DIST_DIR = Tests
-EXTRA_DIST = run_adder.mal run_isolate.mal run_mutation.mal run_memo.mal
run_octopus.mal srvpool.mal
+EXTRA_DIST = run_adder.mal run_isolate.mal run_multicore.mal run_memo.mal
run_octopus.mal srvpool.mal
diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c
--- a/monetdb5/scheduler/mut_policy.c
+++ b/monetdb5/scheduler/mut_policy.c
@@ -73,6 +73,8 @@ MUTpolicy(Client cntxt, Mutant m)
continue;
if ( src->profiler[i].ticks/src->calls <= MUT_THRESHOLD)
continue;
+
+
DEBUG_MULTICORE
mnstr_printf(cntxt->fdout,"#mutation candidate %d cost
"LLFMT"\n", i, src->profiler[i].ticks/src->calls);
@@ -82,12 +84,13 @@ MUTpolicy(Client cntxt, Mutant m)
if ( src->profiler[i].ticks/src->calls >
src->profiler[m->target].ticks/src->calls)
m->target = i;
}
+
DEBUG_MULTICORE
if ( src->profiler && m->target) {
- mnstr_printf(cntxt->fdout,"#mutation calls %d cost "LLFMT"\n",
src->calls, src->runtime/src->calls);
-
- mnstr_printf(cntxt->fdout,"#mutation target instruction %d cost
"LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls);
-
printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL);
+ mnstr_printf(cntxt->fdout,"#mutation calls %d cost
"LLFMT"\n", src->calls, src->runtime/src->calls);
+
+ mnstr_printf(cntxt->fdout,"#mutation target instruction
%d cost "LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls);
+
printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL);
}
/* At this point we have a target instruction to be replaced */
/* safe the previous version in the history list */
@@ -98,8 +101,8 @@ MUTpolicy(Client cntxt, Mutant m)
if ( getModuleId(p) && strncmp(getModuleId(p), "algebra",7)== 0)
{
if(getFunctionId(p) == joinRef)
- mutationJoinDouble(cntxt,m);
-// mutationJoin(cntxt,m);
+// mutationJoinDouble(cntxt,m);
+ mutationJoin(cntxt,m);
else if(getFunctionId(p) == subselectRef)
mutationSelect(cntxt,m);
else // proxy function
@@ -110,7 +113,9 @@ MUTpolicy(Client cntxt, Mutant m)
/* reset/expand the profiler */
if (ssize < src->ssize){
- mnstr_printf(GDKstdout, "\n Changed the size of stack");
+
+ DEBUG_MULTICORE
+ mnstr_printf(cntxt->fdout, "# \n Changed the
size of stack");
GDKfree(src->profiler);
src->profiler = 0;
initProfiler(src);
@@ -124,7 +129,7 @@ MUTpolicy(Client cntxt, Mutant m)
DEBUG_MULTICORE
printFunction(cntxt->fdout, src,0,LIST_MAL_ALL);
if ( src->errors)
- throw(MAL,"run_mutation","Internal error");
+ throw(MAL,"run_multicore","Internal error");
}
return MAL_SUCCEED;
}
diff --git a/monetdb5/scheduler/mut_stopRuns.c
b/monetdb5/scheduler/mut_stopRuns.c
--- a/monetdb5/scheduler/mut_stopRuns.c
+++ b/monetdb5/scheduler/mut_stopRuns.c
@@ -39,18 +39,24 @@
int
checkIfRun(MalBlkPtr mb)
{
- // This is the 1st run, so always execute it
Mutant mutant = (Mutant)mb->mutants;
-// if((mb->mutants && mb->mutants->credit==1 && mb->mutants->debit == 0 &&
mb->mutants->next==NULL)
+ // This is the 1st run, so always execute it
if(mutant && mutant->credit==1 && mutant->debit == 0 &&
mutant->next==NULL)
- {
- return 1;
- }else if(mutant && (mutant->credit - mutant->debit) > 0 )
{
return 1;
+
+ // Let the next execution continue, because it is highly likely going
to be normal unlike current noisy
+ }else if(mutant && mutant->isNoisyRun == TRUE)
+ {
+ return 1;
+ // normal execution
+ }else if ((mutant->credit - mutant->debit) > 0 )
+ {
+ return 1;
}else
- return 0;
+ return 0;
+
}
/* Find the rate of increase / decrease in execution time of current query
with respect to the previous query
@@ -67,7 +73,7 @@ checkRateOfFall(Client cntxt, Mutant mut
lng baseTime, diffTime;
lng queryTotalTime, prevQueryTotalTime;
- int NUM_OF_CORES = sysconf(_SC_NPROCESSORS_ONLN) / 2;
+ int NUM_OF_CORES = MT_check_nr_cores() / 2;
if(mutant)
{
@@ -75,7 +81,8 @@ checkRateOfFall(Client cntxt, Mutant mut
prevQueryTotalTime = mutant->next->totalQueryTime;
}else
{
- mnstr_printf(GDKstdout, "\nNext mutant not present");
+ DEBUG_MULTICORE
+ mnstr_printf(cntxt->fdout, "# \nNext mutant not
present");
return 0;
}
@@ -98,22 +105,49 @@ checkRateOfFall(Client cntxt, Mutant mut
{
mutant->debit = (rateOfImprove * NUM_OF_CORES) +
mutant->next->debit;
mutant->credit = mutant->next->credit;
+
+ // The super noisy run condition check, if the exec time of
current run is greater than serial exec time (0th run), then this is a noisy
run, hence set flag which will
+ // allow the conditionCheck checkIfRun to bypass. The
assumption is since this is a super noisy run, the next run will bring back the
exec time to normal, and hence debit and credit
+ // will compensate each other.
+ if(queryTotalTime >= mutant->serialExecTime)
+ mutant->isNoisyRun = TRUE;
+ else
+ mutant->isNoisyRun = FALSE;
+
}
// once the threshold of number of logical cores passed, and global
minimum presence is located change the rate of debit based on different
thresholds
- if(mutant->currentRun > NUM_OF_CORES *2)
- {
- if(mutant->globalMinRun <= NUM_OF_CORES *2)
- mutant->debit = mutant->debit + 0.25;
+ if(mutant->currentRun > NUM_OF_CORES )
+ {
+ if(mutant->currentRun == NUM_OF_CORES + 1)
+ {
+ // The new leaking debit base is spread across the
available credit at this run, so that at least NUM_OF_CORES*2 runs are
guaranteed
+ // This guarantees the debit is dynamic based on credit
accumulation so far, and thereafter static
+ mutant->baseDebit =
(mutant->credit)/((NUM_OF_CORES*2.5) - mutant->currentRun);
+
+ DEBUG_MULTICORE_PRINT_BASEDEBIT
+ mnstr_printf(cntxt->fdout,"#BaseDebit= %f\n",
mutant->baseDebit);
+ }
+ else
+ mutant->baseDebit = mutant->next->baseDebit;
+
+
+ // If the global minimum is found in the initial window, we
ensure atleast runs till NUM_OF_CORES, but if it falls in after Window,
depending on which window it falls in the rate of aggressiveness
+ // of the debit leak is adjusted accordingly
- else if(mutant->globalMinRun > (NUM_OF_CORES *2) &&
mutant->globalMinRun < (NUM_OF_CORES *2)*2)
- mutant->debit = mutant->debit + 0.5;
+ if(mutant->globalMinRun <= NUM_OF_CORES )
+ mutant->debit = mutant->debit + mutant->baseDebit;
+
+ else if(mutant->globalMinRun > NUM_OF_CORES &&
mutant->globalMinRun <= (NUM_OF_CORES *2))
+ mutant->debit = mutant->debit + mutant->baseDebit +
0.25;
+
+ else if(mutant->globalMinRun > (NUM_OF_CORES *2) &&
mutant->globalMinRun <= (NUM_OF_CORES *2)*2)
+ mutant->debit = mutant->debit + mutant->baseDebit +
0.5;
+
+ else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) &&
mutant->globalMinRun < (NUM_OF_CORES *2)*4)
+ mutant->debit = mutant->debit + mutant->baseDebit +
0.75;
+ }
- else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) &&
mutant->globalMinRun < (NUM_OF_CORES *2)*4)
- mutant->debit = mutant->debit + 0.75;
-
- }
-
- DEBUG_MULTICORE_STOP_RUN
+ DEBUG_MULTICORE_PRINT_BALANCE
mnstr_printf(cntxt->fdout,"#Run- %d Credit- %f Debit- %f
rateOfImprove- %f", mutant->currentRun, mutant->credit, mutant->debit,
rateOfImprove);
return 1;
@@ -124,18 +158,29 @@ checkRateOfFall(Client cntxt, Mutant mut
*/
int checkGlobalMin(Mutant mutant)
{
+ flt prcntDecreaseTimeCur, prcntDecreaseTimePrev;
+
if(mutant->totalQueryTime < mutant->next->globalMinExec)
{
- mutant->globalMinRun = mutant->currentRun;
- mutant->globalMinExec = mutant->totalQueryTime;
+ // Find how much is the percent decrease in time of current run
time with respect to serial exec time
+ // Find how muhc is the percent decrease in time of last global
min with respect to serial exec time
+ // If the increase is greater than a threshold
(globalMinImproveThreshold), then consider present time to be new global min
time
+ prcntDecreaseTimeCur = 100 * ((flt)(mutant->serialExecTime -
mutant->totalQueryTime)/mutant->serialExecTime);
+ prcntDecreaseTimePrev = 100 * ((flt)(mutant->serialExecTime -
mutant->next->globalMinExec)/mutant->serialExecTime);
+
+ if((prcntDecreaseTimeCur - prcntDecreaseTimePrev) >=
globalMinImproveThreshold)
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list