Changeset: 660f837ffe70 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=660f837ffe70
Removed Files:
sql/backends/monet5/iot/Tests/iot01.sql
Modified Files:
monetdb5/mal/mal_instruction.c
monetdb5/optimizer/opt_iot.c
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/Makefile.am
sql/backends/monet5/iot/Tests/All
sql/backends/monet5/iot/Tests/cleanup.sql
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/Tests/iot02.sql
sql/backends/monet5/iot/Tests/iot03.sql
sql/backends/monet5/iot/Tests/iot10.sql
sql/backends/monet5/iot/Tests/iot12.sql
sql/backends/monet5/iot/Tests/iot15.sql
sql/backends/monet5/iot/Tests/receptor01.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.h
sql/backends/monet5/iot/iot.mal
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
sql/backends/monet5/sql_optimizer.c
sql/backends/monet5/sql_scenario.c
Branch: iot
Log Message:
Intermittent commit
diffs (truncated from 1884 to 300 lines):
diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -207,6 +207,7 @@ freeMalBlk(MalBlkPtr mb)
if (mb->history)
freeMalBlk(mb->history);
+ mb->history = 0;
if (mb->binding)
GDKfree(mb->binding);
mb->binding = 0;
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -34,7 +34,7 @@
#include "opt_dataflow.h"
#define MAXBSKT 64
-#define isstream(S,T) \
+#define getStreamTableInfo(S,T) \
for(fnd=0, k= 0; k< btop; k++) \
if( strcmp(schemas[k], S)== 0 && strcmp(tables[k], T)== 0 ){ \
fnd= 1; break;\
@@ -50,13 +50,18 @@ OPTiotImplementation(Client cntxt, MalBl
str tables[MAXBSKT];
int mvc[MAXBSKT];
int done[MAXBSKT]= {0};
- int btop=0;
+ int btop=0, lastmvc;
int noerror=0;
- int cq;
+ int cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
+ char buf[256];
+ lng usec = GDKusec();
(void) pci;
- cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
+ OPTDEBUGiot {
+ mnstr_printf(cntxt->fdout, "#iot optimizer start\n");
+ printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
+ }
old = mb->stmt;
limit = mb->stop;
slimit = mb->ssize;
@@ -64,29 +69,27 @@ OPTiotImplementation(Client cntxt, MalBl
/* first analyse the query for streaming tables */
for (i = 1; i < limit && btop <MAXBSKT; i++){
p = old[i];
- if( getModuleId(p)== basketRef && (getFunctionId(p)==
registerRef || getFunctionId(p)== bindRef || getFunctionId(p)== clear_tableRef)
){
+ if( getModuleId(p)== basketRef && (getFunctionId(p)==
registerRef || getFunctionId(p)== bindRef ) ){
OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
- schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
- tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
- mvc[btop] = getArg(p,0);
+ schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+ tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[j+1])==0 &&
strcmp(tables[j],tables[j+1]) ==0)
break;
- mvc[j] = getArg(p,0);
+ lastmvc = mvc[j] = getArg(p,0);
done[j]= done[j] || getFunctionId(p)== registerRef;
if( j == btop)
btop++;
}
- if( getModuleId(p)== basketRef && (getFunctionId(p) ==
appendRef || getFunctionId(p) == deleteRef )){
+ if( getModuleId(p)== basketRef && getFunctionId(p) == appendRef
){
OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
- mvc[btop] = getArg(p,0);
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[j+1])==0 &&
strcmp(tables[j],tables[j+1]) ==0)
break;
- mvc[j] = getArg(p,0);
+ lastmvc = mvc[j] = getArg(p,0);
if( j == btop)
btop++;
}
@@ -100,12 +103,19 @@ OPTiotImplementation(Client cntxt, MalBl
if( j == btop)
btop++;
}
+ if( getModuleId(p)== sqlRef && getFunctionId(p) == appendRef )
+ lastmvc = getArg(p,0);
+ if (!cq && getModuleId(p) == sqlRef && getFunctionId(p) ==
affectedRowsRef )
+ lastmvc = getArg(p,0);
+ if( getModuleId(p)== iotRef && getFunctionId(p) == tumbleRef){
+ lastmvc = getArg(p,1);
+ }
}
if( btop == MAXBSKT || btop == 0)
return 0;
OPTDEBUGiot {
- mnstr_printf(cntxt->fdout, "#iot optimizer started\n");
+ mnstr_printf(cntxt->fdout, "#iot optimizer started with %d
streams, mvc %d\n", btop,lastmvc);
printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
}
(void) stk;
@@ -118,13 +128,6 @@ OPTiotImplementation(Client cntxt, MalBl
return 0;
pushInstruction(mb, old[0]);
- // register all baskets used
- for( j=0; j<btop; j++)
- if( done[j]==0) {
- p= newStmt(mb,basketRef,registerRef);
- p= pushStr(mb,p, schemas[j]);
- p= pushStr(mb,p, tables[j]);
- }
for (i = 1; i < limit; i++)
if (old[i]) {
p = old[i];
@@ -133,13 +136,30 @@ OPTiotImplementation(Client cntxt, MalBl
freeInstruction(p);
continue;
}
+ if(getModuleId(p) == sqlRef && getFunctionId(p)==
mvcRef){
+ pushInstruction(mb,p);
+ k= getArg(p,0);
+ // register all baskets used
+ for( j=0; j<btop; j++)
+ if( done[j]==0) {
+ p= newStmt(mb,basketRef,registerRef);
+ p= pushArgument(mb,p,k);
+ p= pushStr(mb,p, schemas[j]);
+ p= pushStr(mb,p, tables[j]);
+ alias[k] = getArg(p,0);
+ }
+ continue;
+ }
+ // register all baskets used after the mvc had been
determined
if (getModuleId(p) == sqlRef && getFunctionId(p) ==
tidRef ){
-
isstream(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
+
getStreamTableInfo(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
+ OPTDEBUGiot
+ mnstr_printf(cntxt->fdout, "#iot
optimizer found stream %d\n",fnd);
if( fnd){
alias[getArg(p,0)] = -1;
freeInstruction(p);
+ continue;
}
- continue;
}
if (getModuleId(p) == algebraRef && getFunctionId(p) ==
projectionRef && alias[getArg(p,1)] < 0){
alias[getArg(p,0)] = getArg(p,2);
@@ -148,16 +168,10 @@ OPTiotImplementation(Client cntxt, MalBl
}
if (getModuleId(p) == sqlRef && getFunctionId(p) ==
affectedRowsRef ){
- for(j = 0; j < btop; j++){
- r = newStmt(mb, basketRef, commitRef);
- if (alias[mvc[j]] > 0)
- r = pushArgument(mb,r,
alias[mvc[j]]);
- else
- r = pushArgument(mb,r, mvc[j]);
- r = pushStr(mb,r, schemas[j]);
- r = pushStr(mb,r, tables[j]);
- }
- freeInstruction(p);
+ if(cq)
+ freeInstruction(p);
+ else
+ pushInstruction(mb,p);
continue;
}
@@ -165,9 +179,17 @@ OPTiotImplementation(Client cntxt, MalBl
noerror++;
if (p->token == ENDsymbol && btop > 0 && noerror==0) {
// empty all baskets used only when we are
optimizing a cq
- for( j=0; cq && j<btop; j++)
+ for(j = 0; cq && j < btop; j++){
+ r = newStmt(mb, basketRef, tumbleRef);
+ r = pushArgument(mb,r, lastmvc);
+ r = pushStr(mb,r, schemas[j]);
+ r = pushStr(mb,r, tables[j]);
+ }
+ /* non-contiguous queries call for releasing
the lock on the basket */
+ for( j=0; !cq && j<btop; j++)
if( done[j]==0) {
- p= newStmt(mb,iotRef,tumbleRef);
+ p= newStmt(mb,basketRef,commitRef);
+ p= pushArgument(mb,p, lastmvc);
p= pushStr(mb,p, schemas[j]);
p= pushStr(mb,p, tables[j]);
}
@@ -208,7 +230,7 @@ OPTiotImplementation(Client cntxt, MalBl
getArg(p, j) = alias[getArg(p, j)];
if (getModuleId(p) == sqlRef && getFunctionId(p) ==
appendRef ){
-
isstream(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
+
//getStreamTableInfo(getVarConstant(mb,getArg(p,3)).val.sval,
getVarConstant(mb,getArg(p,4)).val.sval );
/* the appends come in multiple steps.
The first initializes an basket update
statement,
which is triggered when we commit the
transaction.
@@ -226,12 +248,16 @@ OPTiotImplementation(Client cntxt, MalBl
chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE);
chkFlow(cntxt->fdout, mb);
chkDeclarations(cntxt->fdout, mb);
+ /* keep all actions taken as a post block comment */
+ snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec","iot", btop,
GDKusec() - usec);
+ newComment(mb,buf);
OPTDEBUGiot {
mnstr_printf(cntxt->fdout, "#iot optimizer final\n");
printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
}
GDKfree(alias);
+ GDKfree(old);
return btop > 0;
}
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -24,17 +24,20 @@ create procedure iot.query(qry string)
create procedure iot.query("schema" string, name string)
external name iot.query;
-create procedure iot.activate("schema" string, name string)
- external name iot.activate;
+create procedure iot.resume("schema" string, name string)
+ external name iot.resume;
-create procedure iot.activate()
- external name iot.activate;
+create procedure iot.resume()
+ external name iot.resume;
-create procedure iot.deactivate("schema" string, name string)
- external name iot.deactivate;
+create procedure iot.pause("schema" string, name string)
+ external name iot.pause;
-create procedure iot.deactivate()
- external name iot.deactivate;
+create procedure iot.pause()
+ external name iot.pause;
+
+create procedure iot.wait(cycles integer)
+ external name iot.wait;
create procedure iot.deregister("schema" string, name string)
external name iot.deregister;
@@ -61,17 +64,10 @@ create procedure iot.emitter("schema" st
external name iot.emitter;
-create procedure iot.threshold("schema" string, "table" string, elem int)
- external name iot.threshold;
-
-create procedure iot.beat("schema" string, "table" string, msec int)
- external name iot.beat;
+create procedure iot.heartbeat("schema" string, "table" string, msec int)
+ external name iot.heartbeat;
-- cleaup activities
-create procedure iot.tumble()
- external name iot.tumble;
-create procedure iot.tumble("schema" string, "table" string)
- external name iot.tumble;
create procedure iot.tumble("schema" string, "table" string, elem int)
external name iot.tumble;
@@ -81,11 +77,9 @@ create procedure iot.window("schema" str
create procedure iot.window("schema" string, "table" string, elem int, slide
int)
external name iot.window;
-create procedure iot.wait() external name iot.wait;
-
-- Inspection tables
create function iot.baskets()
-returns table( "schema" string, "table" string, "status" string, threshold
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen
timestamp, events int)
+returns table( "schema" string, "table" string, "status" string, threshold
int, winsize int, winstride int, timeslice int, timestride int, heartbeat int,
seen timestamp, events int)
external name iot.baskets;
create function iot.queries()
diff --git a/sql/backends/monet5/iot/Makefile.am
b/sql/backends/monet5/iot/Makefile.am
--- a/sql/backends/monet5/iot/Makefile.am
+++ b/sql/backends/monet5/iot/Makefile.am
@@ -13,9 +13,9 @@ install-exec-local-50_iot.mal: 50_iot.ma
uninstall-local-50_iot.mal:
$(RM) $(DESTDIR)$(libdir)/monetdb5/autoload/50_iot.mal
-iot.o iot.lo: iot.c iot.h ../../../../monetdb5/mal/../../gdk/gdk.h
../../../../monetdb5/mal/mal.h ../../../../monetdb5/mal/mal_interpreter.h
../sql.h ../sql_scenario.h basket.h petrinet.h
../../../../monetdb5/optimizer/opt_prelude.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_pipes.h
../../../../monetdb5/optimizer/opt_iot.h ../sql_optimizer.h ../sql_gencode.h
-petrinet.o petrinet.lo: petrinet.c iot.h
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h
petrinet.h ../../../../monetdb5/mal/mal_builder.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_prelude.h
-basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h
petrinet.h ../../../../monetdb5/mal/mal_instruction.h
../../../../monetdb5/mal/mal_type.h ../../../../monetdb5/mal/mal_stack.h
../../../../monetdb5/mal/mal_namespace.h ../../../../monetdb5/mal/mal_errors.h
../../../../monetdb5/mal/mal_exception.h ../../../../monetdb5/mal/mal_builder.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_prelude.h
+iot.o iot.lo: iot.c ../sql_optimizer.h ../sql_gencode.h
../../../../monetdb5/optimizer/opt_prelude.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_pipes.h
../../../../monetdb5/optimizer/opt_iot.h basket.h
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h
../../../../monetdb5/mal/mal_interpreter.h ../sql.h iot.h ../sql_scenario.h
petrinet.h
+petrinet.o petrinet.lo: petrinet.c iot.h
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h
petrinet.h basket.h ../../../../monetdb5/mal/mal_builder.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_prelude.h
+basket.o basket.lo: basket.c ../../../../gdk/gdk.h iot.h
../../../../monetdb5/mal/../../gdk/gdk.h ../../../../monetdb5/mal/mal.h
../../../../monetdb5/mal/mal_interpreter.h ../sql.h ../sql_scenario.h basket.h
../../../../monetdb5/mal/mal_instruction.h ../../../../monetdb5/mal/mal_type.h
../../../../monetdb5/mal/mal_stack.h ../../../../monetdb5/mal/mal_namespace.h
../../../../monetdb5/mal/mal_errors.h ../../../../monetdb5/mal/mal_exception.h
../../../../monetdb5/mal/mal_builder.h
../../../../monetdb5/optimizer/opt_support.h
../../../../monetdb5/optimizer/../mal/mal.h
../../../../monetdb5/optimizer/../mal/mal_function.h
../../../../monetdb5/optimizer/../mal/mal_scenario.h
../../../../monetdb5/optimizer/../mal/mal_builder.h
../../../../monetdb5/optimizer/opt_prelude.h
install-exec-local-basket.mal: basket.mal
-mkdir -p $(DESTDIR)$(libdir)/monetdb5
-$(RM) $(DESTDIR)$(libdir)/monetdb5/basket.mal
@@ -53,11 +53,11 @@ lib_iot_la_CFLAGS=-DLIBIOT $(AM_CFLAGS)
iotdir = $(libdir)/monetdb5
lib_iot_la_LIBADD = ../../../../monetdb5/tools/libmonetdb5.la
../../../../gdk/libbat.la
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list