Changeset: 8565008b21ce for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8565008b21ce
Modified Files:
monetdb5/optimizer/opt_iot.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/opt_support.c
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.mal
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:
Intermittent commit
diffs (truncated from 889 to 300 lines):
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -33,34 +33,55 @@
#include "opt_statistics.h"
#include "opt_dataflow.h"
+#define MAXBSKT 64
+#define isstream(S,T) \
+ for(fnd=0, k= 0; k< btop; k++) \
+ if( strcmp(schemas[k], S)== 0 && strcmp(tables[k], T)== 0 ){ \
+ fnd= 1; break;\
+ }
+
int
OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
- int actions = 0, mvc=0;
- int i, j, limit, slimit;
+ int mvc=0;
+ int i, j, k, fnd, limit, slimit;
InstrPtr r, p, *old;
- int movetofront=0;
int *alias;
- char *tidlist;
+ str schemas[MAXBSKT];
+ str tables[MAXBSKT];
+ int btop=0;
(void) pci;
(void) mvc;
+ old = mb->stmt;
+ limit = mb->stop;
+ slimit = mb->ssize;
+
+ /* first analyse the query for streaming tables */
+ for (i = 1; i < limit && btop <MAXBSKT; i++){
+ p = old[i];
+ if( getModuleId(p)== basketRef && getFunctionId(p)==
registerRef ){
+ OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
+ schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
+ tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+ btop++;
+ }
+ }
+ if( btop == MAXBSKT || btop == 0)
+ return 0;
+
OPTDEBUGiot {
mnstr_printf(cntxt->fdout, "#iot optimizer started\n");
printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
} else
(void) stk;
- old = mb->stmt;
- limit = mb->stop;
- slimit = mb->ssize;
- if (newMalBlkStmt(mb, slimit) < 0)
+ alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
+ if (alias == 0)
return 0;
- alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
- tidlist = (char *) GDKzalloc(mb->vtop );
- if (alias == 0)
+ if (newMalBlkStmt(mb, slimit) < 0)
return 0;
pushInstruction(mb, old[0]);
@@ -68,30 +89,37 @@ OPTiotImplementation(Client cntxt, MalBl
if (old[i]) {
p = old[i];
- if (getModuleId(p) == iotRef && getFunctionId(p) ==
putName("window", 6) &&
- isVarConstant(mb, getArg(p, 1)) &&
isVarConstant(mb, getArg(p, 2)) && isVarConstant(mb, getArg(p, 3)))
- /* let's move the window to the start of the
block when it consists of constants*/
- movetofront=1;
- if (getModuleId(p) == iotRef && (getFunctionId(p) ==
putName("threshold", 9) || getFunctionId(p) == putName("beat", 4)) &&
- isVarConstant(mb, getArg(p, 1)) &&
isVarConstant(mb, getArg(p, 2)))
- /* let's move the threshold/beat to the start
of the block when it consists of constants*/
- movetofront=1;
- if( movetofront){
- movetofront =0;
- pushInstruction(mb, p);
- for (j = mb->stop - 1; j > 1; j--)
- mb->stmt[j] = mb->stmt[j - 1];
- mb->stmt[j] = p;
+ if (getModuleId(p) == sqlRef && getFunctionId(p) ==
tidRef ){
+
isstream(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
+ if( fnd){
+ alias[getArg(p,0)] = -1;
+ freeInstruction(p);
+ }
+ continue;
+ }
+ if (getModuleId(p) == algebraRef && getFunctionId(p) ==
projectionRef && alias[getArg(p,1)] < 0){
+ alias[getArg(p,0)] = getArg(p,2);
+ freeInstruction(p);
continue;
}
- if (p->token == ENDsymbol) {
- /* a good place to commit the SQL transaction */
+ if (getModuleId(p) == sqlRef && getFunctionId(p) ==
affectedRowsRef ){
+ freeInstruction(p);
+ continue;
+ }
+
+ if (p->token == ENDsymbol && btop > 0) {
/* catch any exception left behind */
r = newAssignment(mb);
j = getArg(r, 0) = newVariable(mb,
GDKstrdup("SQLexception"), TYPE_str);
setVarUDFtype(mb, j);
r->barrier = CATCHsymbol;
+
+ r = newStmt(mb,iotRef, errorRef);
+ r = pushStr(mb, r, getModuleId(old[0]));
+ r = pushStr(mb, r, getFunctionId(old[0]));
+ r = pushArgument(mb, r, j);
+
r = newAssignment(mb);
getArg(r, 0) = j;
r->barrier = EXITsymbol;
@@ -99,51 +127,25 @@ OPTiotImplementation(Client cntxt, MalBl
j = getArg(r, 0) = newVariable(mb,
GDKstrdup("MALexception"), TYPE_str);
setVarUDFtype(mb, j);
r->barrier = CATCHsymbol;
+
+ r = newStmt(mb,iotRef, errorRef);
+ r = pushStr(mb, r, getModuleId(old[0]));
+ r = pushStr(mb, r, getFunctionId(old[0]));
+ r = pushArgument(mb, r, j);
+
r = newAssignment(mb);
getArg(r, 0) = j;
r->barrier = EXITsymbol;
-
break;
}
- if (getModuleId(p) == sqlRef && getFunctionId(p) ==
mvcRef)
- mvc = getArg(p, 0);
-
- /* trim the number of sql instructions dealing with
baskets */
- if (getModuleId(p) == sqlRef && getFunctionId(p) ==
putName("affectedRows", 12)) {
- freeInstruction(p);
- continue;
- }
-
- /* remove consolidation of tid lists */
- if (getModuleId(p) == algebraRef && getFunctionId(p) ==
subjoinRef && tidlist[getArg(p,1)]){
- alias[getArg(p, 0)] = getArg(p,2);
- freeInstruction(p);
- continue;
- }
- /* remove delta processing for baskets */
- if (getModuleId(p) == sqlRef && (getFunctionId(p) ==
deltaRef || getFunctionId(p) == subdeltaRef) ) {
- clrFunction(p);
- p->argc =2;
- pushInstruction(mb, p);
- continue;
- }
-
- /* remove delta processing for baskets */
- if (getModuleId(p) == sqlRef && (getFunctionId(p) ==
projectdeltaRef || getFunctionId(p) == subdeltaRef) ) {
- clrFunction(p);
- setModuleId(p,algebraRef);
- setFunctionId(p,subjoinRef);
- p->argc =3;
- pushInstruction(mb, p);
- continue;
- }
for (j = 0; j < p->argc; j++)
- if (alias[getArg(p, j)])
+ if (alias[getArg(p, j)] > 0)
getArg(p, j) = alias[getArg(p, j)];
- if (getModuleId(p) == sqlRef && getFunctionId(p) ==
appendRef) {
+ if (getModuleId(p) == sqlRef && getFunctionId(p) ==
appendRef ){
+
isstream(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
/* the appends come in multiple steps.
The first initializes an basket update
statement,
which is triggered when we commit the
transaction.
@@ -156,15 +158,12 @@ OPTiotImplementation(Client cntxt, MalBl
for (; i<limit; i++)
if (old[i])
pushInstruction(mb,old[i]);
- (void) stk;
- (void) pci;
OPTDEBUGiot {
mnstr_printf(cntxt->fdout, "#iot optimizer intermediate\n");
printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
}
GDKfree(alias);
- GDKfree(tidlist);
- return actions;
+ return btop > 0;
}
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -79,6 +79,7 @@ str intersectcandRef;
str eqRef;
str disconnectRef;
str evalRef;
+str errorRef;
str execRef;
str expandRef;
str exportOperationRef;
@@ -296,6 +297,7 @@ void optimizerInit(void)
eqRef = putName("==",2);
disconnectRef= putName("disconnect",10);
evalRef = putName("eval",4);
+ errorRef = putName("error",5);
execRef = putName("exec",4);
expandRef = putName("expand",6);
exportOperationRef = putName("exportOperation",15);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -77,6 +77,7 @@ opt_export str intersectcandRef;
opt_export str eqRef;
opt_export str disconnectRef;
opt_export str evalRef;
+opt_export str errorRef;
opt_export str execRef;
opt_export str expandRef;
opt_export str exportOperationRef;
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -569,7 +569,7 @@ hasSideEffects(InstrPtr p, int strict)
getModuleId(p) != groupRef )
return TRUE;
- if ( getModuleId(p) == iotRef){
+ if ( getModuleId(p) == basketRef){
if( getFunctionId(p) == registerRef)
return TRUE;
}
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -1,6 +1,5 @@
--- Clear the stream testing environment
+-- A simple continuous query.
set schema iot;
-set optimizer='iot_pipe';
create stream table stream_tmp (t timestamp, sensor integer, val decimal(8,2))
;
create table result(like stream_tmp);
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -43,11 +43,11 @@ static int BSKTnewEntry(void)
{
int i;
if (bsktLimit == 0) {
- bsktLimit = MAXBSK;
+ bsktLimit = MAXBSKT;
baskets = (BasketRec *) GDKzalloc(bsktLimit *
sizeof(BasketRec));
bsktTop = 1; /* entry 0 is used as non-initialized */
} else if (bsktTop +1 == bsktLimit) {
- bsktLimit += MAXBSK;
+ bsktLimit += MAXBSKT;
baskets = (BasketRec *) GDKrealloc(baskets, bsktLimit *
sizeof(BasketRec));
}
for (i = 1; i < bsktLimit; i++)
@@ -97,8 +97,8 @@ BSKTlocate(str sch, str tbl)
}
// Instantiate a basket description for a particular table
-str
-BSKTnewbasket(sql_schema *s, sql_table *t, sql_trans *tr)
+static str
+BSKTnewbasket(sql_schema *s, sql_table *t)
{
int idx, i;
node *o;
@@ -135,7 +135,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
i = 0;
for (o = t->columns.set->h; o; o = o->next) {
c = o->data;
- b =store_funcs.bind_col(tr, c, RD_INS);
+ b= BATnew(TYPE_void, c->type.type->localtype, 0, TRANSIENT);
if (b == NULL) {
BSKTclean(idx);
MT_lock_unset(&iotLock);
@@ -161,7 +161,6 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
sql_table *t;
mvc *m = NULL;
str msg = getSQLContext(cntxt, mb, &m, NULL);
- sql_trans *tr;
str sch, tbl;
if ( msg != MAL_SUCCEED)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list