Changeset: a38b3256e2b4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a38b3256e2b4
Added Files:
monetdb5/optimizer/opt_volcano.c
monetdb5/optimizer/opt_volcano.h
Modified Files:
clients/Tests/MAL-signatures.stable.out
clients/Tests/MAL-signatures.stable.out.int128
clients/Tests/exports.stable.out
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
monetdb5/mal/mal_runtime.c
monetdb5/mal/mal_runtime.h
monetdb5/modules/mal/language.mal
monetdb5/optimizer/Makefile.ag
monetdb5/optimizer/opt_pipes.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/opt_support.h
monetdb5/optimizer/opt_wrapper.c
monetdb5/optimizer/optimizer.mal
sql/backends/monet5/sql.c
sql/backends/monet5/sql_optimizer.c
sql/test/Tests/setoptimizer.stable.err
sql/test/Tests/setoptimizer.stable.out
sql/test/Tests/setoptimizer.stable.out.Windows
Branch: rdf
Log Message:
Merge with default
diffs (truncated from 656 to 300 lines):
diff --git a/clients/Tests/MAL-signatures.stable.out
b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -38338,6 +38338,10 @@ command language.assert(v:sht,term:str):
address MALassertSht;
command language.assert(v:bit,term:str):void
address MALassertBit;
+pattern language.block(v:int,w:any...):int
+address deblockdataflow;
+comment Block on availability of all variables w, and then pass on v
+
pattern language.call(s:bat[:str]):void
address CMDcallBAT;
comment Evaluate a program stored in a BAT.
@@ -39674,6 +39678,12 @@ comment Collect trace of a specific oper
pattern optimizer.trace():str
address OPTwrapper;
+pattern optimizer.volcano(mod:str,fcn:str):str
+address OPTwrapper;
+comment Simulate volcano style execution
+
+pattern optimizer.volcano():str
+address OPTwrapper;
command pcre.imatch(s:str,pat:str):bit
address PCREimatch;
comment Caseless Perl Compatible Regular Expression pattern matching against a
string
diff --git a/clients/Tests/MAL-signatures.stable.out.int128
b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -49189,6 +49189,10 @@ command language.assert(v:sht,term:str):
address MALassertSht;
command language.assert(v:bit,term:str):void
address MALassertBit;
+pattern language.block(v:int,w:any...):int
+address deblockdataflow;
+comment Block on availability of all variables w, and then pass on v
+
pattern language.call(s:bat[:str]):void
address CMDcallBAT;
comment Evaluate a program stored in a BAT.
@@ -50533,6 +50537,12 @@ comment Collect trace of a specific oper
pattern optimizer.trace():str
address OPTwrapper;
+pattern optimizer.volcano(mod:str,fcn:str):str
+address OPTwrapper;
+comment Simulate volcano style execution
+
+pattern optimizer.volcano():str
+address OPTwrapper;
command pcre.imatch(s:str,pat:str):bit
address PCREimatch;
comment Caseless Perl Compatible Regular Expression pattern matching against a
string
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1251,6 +1251,7 @@ str MALpass(Client cntxt, MalBlkPtr mb,
str MALpipeline(Client c);
str MALreader(Client c);
void MALresourceFairness(lng usec);
+int MALrunningThreads(void);
str MALstartDataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str MANIFOLDevaluate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str MANIFOLDremapMultiplex(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
p);
@@ -1540,6 +1541,7 @@ int OPTremapImplementation(Client cntxt,
int OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
int OPTreorderImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr p);
str OPTsetDebugStr(void *ret, str *nme);
+int OPTvolcanoImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr p);
str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
str PCREilike2(bit *ret, const str *s, const str *pat);
str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc);
@@ -1933,6 +1935,7 @@ str bindRef;
str binddbatRef;
str bindidxRef;
var_t blobsize(size_t nitems);
+str blockRef;
str bpmRef;
str bstreamRef;
int bstream_create_wrap(Bstream *BS, Stream *S, int *bufsize);
@@ -1993,6 +1996,7 @@ int daytime_fromstr(const char *buf, int
int daytime_tostr(str *buf, int *len, const daytime *val);
int daytime_tz_fromstr(const char *buf, int *len, daytime **ret);
str dblRef;
+str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
void debugFunction(stream *fd, MalBlkPtr mb, MalStkPtr stk, int flg, int
first, int size);
void debugLifespan(Client cntxt, MalBlkPtr mb, Lifespan span);
str debugOptimizers(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
@@ -2057,6 +2061,7 @@ str generatorRef;
MALfcn getAddress(stream *out, str filename, str modnme, str fcnname, int
silent);
str getArgDefault(MalBlkPtr mb, InstrPtr p, int idx);
ptr getArgReference(MalStkPtr stk, InstrPtr pci, int k);
+lng getBatSpace(BAT *b);
int getBitConstant(MalBlkPtr mb, bit val);
int getBlockBegin(MalBlkPtr mb, int pc);
int getBlockExit(MalBlkPtr mb, int pc);
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,13 +380,17 @@ DFLOWworker(void *T)
MT_lock_unset(&flow->flowlock);
#ifdef USE_MAL_ADMISSION
- if (MALadmission(fe->argclaim, fe->hotclaim)) {
- fe->hotclaim = 0; /* don't assume priority anymore */
- fe->maxclaim = 0;
- if (todo->last == 0)
- MT_sleep_ms(DELAYUNIT);
- q_requeue(todo, fe);
- continue;
+ if (MALrunningThreads() > 2 && MALadmission(fe->argclaim,
fe->hotclaim)) {
+ // never block on deblockdataflow()
+ p= getInstrPtr(flow->mb,fe->pc);
+ if( p->fcn != (MALfcn) deblockdataflow){
+ fe->hotclaim = 0; /* don't assume priority
anymore */
+ fe->maxclaim = 0;
+ if (todo->last == 0)
+ MT_sleep_ms(DELAYUNIT);
+ q_requeue(todo, fe);
+ continue;
+ }
}
#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
@@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
return msg;
}
+str
+deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int *ret = getArgReference_int(stk,pci,0);
+ int *val = getArgReference_int(stk,pci,1);
+ (void) cntxt;
+ (void) mb;
+ *ret = *val;
+ return MAL_SUCCEED;
+}
+
void
stopMALdataflow(void)
{
diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h
--- a/monetdb5/mal/mal_dataflow.h
+++ b/monetdb5/mal/mal_dataflow.h
@@ -13,5 +13,6 @@
#include "mal_client.h"
mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int
stoppc, MalStkPtr stk);
+mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif /* _MAL_DATAFLOW_H*/
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -206,7 +206,7 @@ MALresourceFairness(lng usec)
if (rss < MEMORY_THRESHOLD )
break;
threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
- delay = (unsigned int) ( ((double)DELAYUNIT * running)
/ threads);
+ delay = (unsigned int) ( ((double)DELAYUNIT * running)
/ threads) + 1;
if (delay) {
if ( delayed++ == 0){
PARDEBUG
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n",
delay, clk, rss, MEMORY_THRESHOLD );
@@ -221,6 +221,13 @@ MALresourceFairness(lng usec)
}
}
+// Get a hint on the parallel behavior
+int
+MALrunningThreads(void)
+{
+ return running;
+}
+
void
initResource(void)
{
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -12,7 +12,7 @@
#include "mal_interpreter.h"
#define TIMESLICE 2000000 /* usec */
-#define DELAYUNIT 5 /* ms delay in parallel processing decisions */
+#define DELAYUNIT 2 /* ms delay in parallel processing decisions */
#define MAX_DELAYS 1000 /* never wait forever */
#define USE_MAL_ADMISSION
@@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim
mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int
i, int flag);
mal_export void MALresourceFairness(lng usec);
+mal_export int MALrunningThreads(void);
#endif /* _MAL_RESOURCE_H*/
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -22,7 +22,7 @@
#include "mal_private.h"
#define heapinfo(X) ((X) && (X)->base ? (X)->free: 0)
-#define hashinfo(X) (((X) && (X) != (Hash *) 1 && (X)->mask)? ((X)->mask +
(X)->lim + 1) * sizeof(int) + sizeof(*(X)) + cnt * sizeof(int): 0)
+#define hashinfo(X) ( (X)? heapinfo((X)->heap):0)
// Keep a queue of running queries
QueryQueue QRYqueue;
@@ -212,6 +212,19 @@ runtimeProfileExit(Client cntxt, MalBlkP
* may trigger a side effect, such as creating a hash-index.
* Side effects are ignored.
*/
+
+lng
+getBatSpace(BAT *b){
+ lng space=0;
+ if( b == NULL)
+ return 0;
+ if( b->T) space += heapinfo(&b->T->heap);
+ if( b->T->vheap) space += heapinfo(b->T->vheap);
+ if(b->T) space += hashinfo(b->T->hash);
+ space += IMPSimprintsize(b);
+ return space;
+}
+
lng getVolume(MalStkPtr stk, InstrPtr pci, int rd)
{
int i, limit;
diff --git a/monetdb5/mal/mal_runtime.h b/monetdb5/mal/mal_runtime.h
--- a/monetdb5/mal/mal_runtime.h
+++ b/monetdb5/mal/mal_runtime.h
@@ -41,6 +41,7 @@ mal_export void runtimeProfileBegin(Clie
mal_export void runtimeProfileExit(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci, RuntimeProfile prof);
mal_export void finishSessionProfiler(Client cntxt);
mal_export lng getVolume(MalStkPtr stk, InstrPtr pci, int rd);
+mal_export lng getBatSpace(BAT *b);
mal_export QueryQueue QRYqueue;
#endif
diff --git a/monetdb5/modules/mal/language.mal
b/monetdb5/modules/mal/language.mal
--- a/monetdb5/modules/mal/language.mal
+++ b/monetdb5/modules/mal/language.mal
@@ -51,6 +51,10 @@ pattern pass(v:any_1)
address MALpass
comment "Cheap instruction to disgard storage while retaining the dataflow
dependency";
+pattern block(v:int,w:any...):int
+address deblockdataflow
+comment "Block on availability of all variables w, and then pass on v";
+
pattern register(m:str,f:str,code:str,help:str):void
address CMDregisterFunction
comment"Compile the code string to MAL and register it as a function.";
diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag
--- a/monetdb5/optimizer/Makefile.ag
+++ b/monetdb5/optimizer/Makefile.ag
@@ -51,6 +51,7 @@ lib_optimizer = {
opt_support.c opt_support.h \
opt_pushselect.c opt_pushselect.h \
opt_profiler.c opt_profiler.h \
+ opt_volcano.c opt_volcano.h \
opt_wrapper.c
}
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -90,6 +90,37 @@ static struct PIPELINES {
"optimizer.profiler();"
"optimizer.garbageCollector();",
"stable", NULL, NULL, 1},
+/*
+ * Volcano style execution produces a sequence of blocks from the source
relation
+ */
+ {"volcano_pipe",
+ "optimizer.inline();"
+ "optimizer.candidates();"
+ "optimizer.remap();"
+ "optimizer.costModel();"
+ "optimizer.coercions();"
+ "optimizer.evaluate();"
+ "optimizer.aliases();"
+ "optimizer.pushselect();"
+ "optimizer.mitosis();"
+ "optimizer.mergetable();"
+ "optimizer.deadcode();"
+ "optimizer.aliases();"
+ "optimizer.constants();"
+ "optimizer.commonTerms();"
+ "optimizer.projectionpath();"
+ "optimizer.reorder();"
+ "optimizer.deadcode();"
+ "optimizer.reduce();"
+ "optimizer.matpack();"
+ "optimizer.dataflow();"
+ "optimizer.querylog();"
+ "optimizer.multiplex();"
+ "optimizer.generator();"
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list