Changeset: 8565008b21ce for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8565008b21ce
Modified Files:
        monetdb5/optimizer/opt_iot.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        monetdb5/optimizer/opt_support.c
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/iot.mal
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
        sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:

Intermittent commit


diffs (truncated from 889 to 300 lines):

diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -33,34 +33,55 @@
 #include "opt_statistics.h"
 #include "opt_dataflow.h"
 
+#define MAXBSKT 64
+#define isstream(S,T) \
+       for(fnd=0, k= 0; k< btop; k++) \
+       if( strcmp(schemas[k], S)== 0 && strcmp(tables[k], T)== 0 ){ \
+               fnd= 1; break;\
+       }
+
 int
 OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-       int actions = 0, mvc=0;
-       int i, j, limit, slimit;
+       int mvc=0;
+       int i, j, k, fnd, limit, slimit;
        InstrPtr r, p, *old;
-       int  movetofront=0;
        int *alias;
-       char *tidlist;
+       str  schemas[MAXBSKT];
+       str  tables[MAXBSKT];
+       int btop=0;
 
        (void) pci;
        (void) mvc;
 
+       old = mb->stmt;
+       limit = mb->stop;
+       slimit = mb->ssize;
+
+       /* first analyse the query for streaming tables */
+       for (i = 1; i < limit && btop <MAXBSKT; i++){
+               p = old[i];
+               if( getModuleId(p)== basketRef && getFunctionId(p)== 
registerRef ){
+                       OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
+                       schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
+                       tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+                       btop++;
+               }
+       }
+       if( btop == MAXBSKT || btop == 0)
+               return 0;
+
        OPTDEBUGiot {
                mnstr_printf(cntxt->fdout, "#iot optimizer started\n");
                printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
        } else
                (void) stk;
 
-       old = mb->stmt;
-       limit = mb->stop;
-       slimit = mb->ssize;
-       if (newMalBlkStmt(mb, slimit) < 0)
+       alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
+       if (alias == 0)
                return 0;
 
-       alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
-       tidlist = (char *) GDKzalloc(mb->vtop );
-       if (alias == 0)
+       if (newMalBlkStmt(mb, slimit) < 0)
                return 0;
 
        pushInstruction(mb, old[0]);
@@ -68,30 +89,37 @@ OPTiotImplementation(Client cntxt, MalBl
                if (old[i]) {
                        p = old[i];
 
-                       if (getModuleId(p) == iotRef && getFunctionId(p) == 
putName("window", 6) &&
-                               isVarConstant(mb, getArg(p, 1)) && 
isVarConstant(mb, getArg(p, 2)) && isVarConstant(mb, getArg(p, 3)))
-                               /* let's move the window to the start of the 
block  when it consists of constants*/
-                               movetofront=1;
-                       if (getModuleId(p) == iotRef && (getFunctionId(p) == 
putName("threshold", 9) || getFunctionId(p) == putName("beat", 4)) &&
-                               isVarConstant(mb, getArg(p, 1)) && 
isVarConstant(mb, getArg(p, 2)))
-                               /* let's move the threshold/beat to the start 
of the block  when it consists of constants*/
-                               movetofront=1;
-                       if( movetofront){
-                               movetofront =0;
-                               pushInstruction(mb, p);
-                               for (j = mb->stop - 1; j > 1; j--)
-                                       mb->stmt[j] = mb->stmt[j - 1];
-                               mb->stmt[j] = p;
+                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
tidRef ){
+                               
isstream(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
+                               if( fnd){
+                                       alias[getArg(p,0)] = -1;
+                                       freeInstruction(p);
+                               }
+                               continue;
+                       }
+                       if (getModuleId(p) == algebraRef && getFunctionId(p) == 
projectionRef && alias[getArg(p,1)] < 0){
+                               alias[getArg(p,0)] = getArg(p,2);
+                               freeInstruction(p);
                                continue;
                        }
 
-                       if (p->token == ENDsymbol) {
-                               /* a good place to commit the SQL transaction */
+                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
affectedRowsRef ){
+                               freeInstruction(p);
+                               continue;
+                       }
+
+                       if (p->token == ENDsymbol && btop > 0) {
                                /* catch any exception left behind */
                                r = newAssignment(mb);
                                j = getArg(r, 0) = newVariable(mb, 
GDKstrdup("SQLexception"), TYPE_str);
                                setVarUDFtype(mb, j);
                                r->barrier = CATCHsymbol;
+
+                               r = newStmt(mb,iotRef, errorRef);
+                               r = pushStr(mb, r, getModuleId(old[0]));
+                               r = pushStr(mb, r, getFunctionId(old[0]));
+                               r = pushArgument(mb, r, j);
+
                                r = newAssignment(mb);
                                getArg(r, 0) = j;
                                r->barrier = EXITsymbol;
@@ -99,51 +127,25 @@ OPTiotImplementation(Client cntxt, MalBl
                                j = getArg(r, 0) = newVariable(mb, 
GDKstrdup("MALexception"), TYPE_str);
                                setVarUDFtype(mb, j);
                                r->barrier = CATCHsymbol;
+
+                               r = newStmt(mb,iotRef, errorRef);
+                               r = pushStr(mb, r, getModuleId(old[0]));
+                               r = pushStr(mb, r, getFunctionId(old[0]));
+                               r = pushArgument(mb, r, j);
+
                                r = newAssignment(mb);
                                getArg(r, 0) = j;
                                r->barrier = EXITsymbol;
-
                                break;
                        }
 
-                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
mvcRef)
-                               mvc = getArg(p, 0);
-
-                       /* trim the number of sql instructions dealing with 
baskets */
-                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
putName("affectedRows", 12)) {
-                               freeInstruction(p);
-                               continue;
-                       }
-
-                       /* remove consolidation of tid lists */
-                       if (getModuleId(p) == algebraRef && getFunctionId(p) == 
subjoinRef  && tidlist[getArg(p,1)]){
-                               alias[getArg(p, 0)] = getArg(p,2);
-                               freeInstruction(p);
-                               continue;
-                       }
-                       /* remove delta processing for baskets */
-                       if (getModuleId(p) == sqlRef && (getFunctionId(p) == 
deltaRef || getFunctionId(p) == subdeltaRef) ) {
-                               clrFunction(p);
-                               p->argc =2;
-                               pushInstruction(mb, p);
-                               continue;
-                       }
-
-                       /* remove delta processing for baskets */
-                       if (getModuleId(p) == sqlRef && (getFunctionId(p) == 
projectdeltaRef || getFunctionId(p) == subdeltaRef) ) {
-                               clrFunction(p);
-                               setModuleId(p,algebraRef);
-                               setFunctionId(p,subjoinRef);
-                               p->argc =3;
-                               pushInstruction(mb, p);
-                               continue;
-                       }
 
                        for (j = 0; j < p->argc; j++)
-                               if (alias[getArg(p, j)])
+                               if (alias[getArg(p, j)] > 0)
                                        getArg(p, j) = alias[getArg(p, j)];
 
-                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
appendRef) {
+                       if (getModuleId(p) == sqlRef && getFunctionId(p) == 
appendRef ){
+                               
isstream(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
                                /* the appends come in multiple steps.
                                   The first initializes an basket update 
statement,
                                   which is triggered when we commit the 
transaction.
@@ -156,15 +158,12 @@ OPTiotImplementation(Client cntxt, MalBl
     for (; i<limit; i++)
         if (old[i])
             pushInstruction(mb,old[i]);
-       (void) stk;
-       (void) pci;
 
        OPTDEBUGiot {
                mnstr_printf(cntxt->fdout, "#iot optimizer intermediate\n");
                printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
        } 
        GDKfree(alias);
-       GDKfree(tidlist);
-       return actions;
+       return btop > 0;
 }
 
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -79,6 +79,7 @@ str intersectcandRef;
 str eqRef;
 str disconnectRef;
 str evalRef;
+str errorRef;
 str execRef;
 str expandRef;
 str exportOperationRef;
@@ -296,6 +297,7 @@ void optimizerInit(void)
        eqRef = putName("==",2);
        disconnectRef= putName("disconnect",10);
        evalRef = putName("eval",4);
+       errorRef = putName("error",5);
        execRef = putName("exec",4);
        expandRef = putName("expand",6);
        exportOperationRef = putName("exportOperation",15);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -77,6 +77,7 @@ opt_export  str intersectcandRef;
 opt_export  str eqRef;
 opt_export  str disconnectRef;
 opt_export  str evalRef;
+opt_export  str errorRef;
 opt_export  str execRef;
 opt_export  str expandRef;
 opt_export     str exportOperationRef;
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -569,7 +569,7 @@ hasSideEffects(InstrPtr p, int strict)
                getModuleId(p) != groupRef )
                return TRUE;
 
-       if ( getModuleId(p) == iotRef){
+       if ( getModuleId(p) == basketRef){
                if( getFunctionId(p) == registerRef)
                        return TRUE;
        }
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -1,6 +1,5 @@
--- Clear the stream testing environment
+-- A simple continuous query.
 set schema iot;
-set optimizer='iot_pipe';
 
 create stream table stream_tmp (t timestamp, sensor integer, val decimal(8,2)) 
;
 create table result(like stream_tmp);
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -43,11 +43,11 @@ static int BSKTnewEntry(void)
 {
        int i;
        if (bsktLimit == 0) {
-               bsktLimit = MAXBSK;
+               bsktLimit = MAXBSKT;
                baskets = (BasketRec *) GDKzalloc(bsktLimit * 
sizeof(BasketRec));
                bsktTop = 1; /* entry 0 is used as non-initialized */
        } else if (bsktTop +1 == bsktLimit) {
-               bsktLimit += MAXBSK;
+               bsktLimit += MAXBSKT;
                baskets = (BasketRec *) GDKrealloc(baskets, bsktLimit * 
sizeof(BasketRec));
        }
        for (i = 1; i < bsktLimit; i++)
@@ -97,8 +97,8 @@ BSKTlocate(str sch, str tbl)
 }
 
 // Instantiate a basket description for a particular table
-str
-BSKTnewbasket(sql_schema *s, sql_table *t, sql_trans *tr)
+static str
+BSKTnewbasket(sql_schema *s, sql_table *t)
 {
        int idx, i;
        node *o;
@@ -135,7 +135,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
        i = 0;
        for (o = t->columns.set->h; o; o = o->next) {
                c = o->data;
-               b =store_funcs.bind_col(tr, c, RD_INS);
+               b= BATnew(TYPE_void, c->type.type->localtype, 0, TRANSIENT);
                if (b == NULL) {
                        BSKTclean(idx);
                        MT_lock_unset(&iotLock);
@@ -161,7 +161,6 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
        sql_table   *t;
        mvc *m = NULL;
        str msg = getSQLContext(cntxt, mb, &m, NULL);
-       sql_trans *tr;
        str sch, tbl;
 
        if ( msg != MAL_SUCCEED)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to