Changeset: 0263be48f587 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0263be48f587
Modified Files:
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
Branch: default
Log Message:
Add dataflow deblocking instruction
It should alway be allowed to enter the queue,
even if we have resource constraints.
Alse a minimal number of workers should be
active before resource control kicks in.
diffs (101 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,13 +380,17 @@ DFLOWworker(void *T)
MT_lock_unset(&flow->flowlock);
#ifdef USE_MAL_ADMISSION
- if (MALadmission(fe->argclaim, fe->hotclaim)) {
- fe->hotclaim = 0; /* don't assume priority anymore */
- fe->maxclaim = 0;
- if (todo->last == 0)
- MT_sleep_ms(DELAYUNIT);
- q_requeue(todo, fe);
- continue;
+ if (MALrunningThreads() > 2 && MALadmission(fe->argclaim,
fe->hotclaim)) {
+ // never block on deblockdataflow()
+ p= getInstrPtr(flow->mb,fe->pc);
+ if( p->fcn != (MALfcn) deblockdataflow){
+ fe->hotclaim = 0; /* don't assume priority
anymore */
+ fe->maxclaim = 0;
+ if (todo->last == 0)
+ MT_sleep_ms(DELAYUNIT);
+ q_requeue(todo, fe);
+ continue;
+ }
}
#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
@@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
return msg;
}
+str
+deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int *ret = getArgReference_int(stk,pci,0);
+ int *val = getArgReference_int(stk,pci,1);
+ (void) cntxt;
+ (void) mb;
+ *ret = *val;
+ return MAL_SUCCEED;
+}
+
void
stopMALdataflow(void)
{
diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h
--- a/monetdb5/mal/mal_dataflow.h
+++ b/monetdb5/mal/mal_dataflow.h
@@ -13,5 +13,6 @@
#include "mal_client.h"
mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int
stoppc, MalStkPtr stk);
+mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif /* _MAL_DATAFLOW_H*/
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -206,7 +206,7 @@ MALresourceFairness(lng usec)
if (rss < MEMORY_THRESHOLD )
break;
threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
- delay = (unsigned int) ( ((double)DELAYUNIT * running)
/ threads);
+ delay = (unsigned int) ( ((double)DELAYUNIT * running)
/ threads) + 1;
if (delay) {
if ( delayed++ == 0){
PARDEBUG
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n",
delay, clk, rss, MEMORY_THRESHOLD );
@@ -221,6 +221,13 @@ MALresourceFairness(lng usec)
}
}
+// Get a hint on the parallel behavior
+int
+MALrunningThreads(void)
+{
+ return running;
+}
+
void
initResource(void)
{
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -12,7 +12,7 @@
#include "mal_interpreter.h"
#define TIMESLICE 2000000 /* usec */
-#define DELAYUNIT 5 /* ms delay in parallel processing decisions */
+#define DELAYUNIT 2 /* ms delay in parallel processing decisions */
#define MAX_DELAYS 1000 /* never wait forever */
#define USE_MAL_ADMISSION
@@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim
mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int
i, int flag);
mal_export void MALresourceFairness(lng usec);
+mal_export int MALrunningThreads(void);
#endif /* _MAL_RESOURCE_H*/
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list