Changeset: 086a0278a6a8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=086a0278a6a8
Modified Files:
        clients/Tests/malcheck.stable.out
        monetdb5/optimizer/opt_iot.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/iot.h
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:

Intermittent commit
- add delete code
- add transaction brackets
- improve optimizer
- reduce lock management


diffs (truncated from 767 to 300 lines):

diff --git a/clients/Tests/malcheck.stable.out 
b/clients/Tests/malcheck.stable.out
--- a/clients/Tests/malcheck.stable.out
+++ b/clients/Tests/malcheck.stable.out
@@ -10,6 +10,7 @@ BSKTthreshold: missing for MAL command t
 BSKTwindow: missing for MAL command window in 
sql/backends/monet5/iot/basket.mal
 BSKTtimewindow: missing for MAL command timewindow in 
sql/backends/monet5/iot/basket.mal
 BSKTbeat: missing for MAL command beat in sql/backends/monet5/iot/basket.mal
+IOTstop: missing for MAL pattern step in sql/backends/monet5/iot/iot.mal
 PNstep: missing for MAL pattern step in sql/backends/monet5/iot/petrinet.mal
 
 # 15:16:26 >  
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -45,16 +45,17 @@
 int
 OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-       int mvc=0;
        int i, j, k, fnd, limit, slimit;
        InstrPtr r, p, *old;
        int *alias;
        str  schemas[MAXBSKT];
        str  tables[MAXBSKT];
+       int  mvc[MAXBSKT];
+       int done[MAXBSKT]= {0};
        int btop=0;
+       int commit =0;
 
        (void) pci;
-       (void) mvc;
 
        old = mb->stmt;
        limit = mb->stop;
@@ -63,11 +64,31 @@ OPTiotImplementation(Client cntxt, MalBl
        /* first analyse the query for streaming tables */
        for (i = 1; i < limit && btop <MAXBSKT; i++){
                p = old[i];
-               if( getModuleId(p)== basketRef && getFunctionId(p)== 
registerRef ){
+               if( getModuleId(p)== basketRef && (getFunctionId(p)== 
registerRef || getFunctionId(p)== bindRef || getFunctionId(p)== clear_tableRef) 
 ){
                        OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
                        schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
-                       btop++;
+                       mvc[btop] = getArg(p,0);
+                       for( j =0; j< btop ; j++)
+                       if( strcmp(schemas[j], schemas[j+1])==0  && 
strcmp(tables[j],tables[j+1]) ==0)
+                               break;
+                       mvc[j] = getArg(p,0);
+                       done[j]= done[j]== 0 || getFunctionId(p)== registerRef;
+                       if( j == btop)
+                               btop++;
+               }
+               if( getModuleId(p)== basketRef && (getFunctionId(p) == 
appendRef || getFunctionId(p) == deleteRef )){
+                       OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
+                       schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+                       tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+                       mvc[btop] = getArg(p,0);
+                       for( j =0; j< btop ; j++)
+                       if( strcmp(schemas[j], schemas[j+1])==0  && 
strcmp(tables[j],tables[j+1]) ==0)
+                               break;
+
+                       mvc[j] = getArg(p,0);
+                       if( j == btop)
+                               btop++;
                }
        }
        if( btop == MAXBSKT || btop == 0)
@@ -87,10 +108,22 @@ OPTiotImplementation(Client cntxt, MalBl
                return 0;
 
        pushInstruction(mb, old[0]);
+       // register all baskets used
+       for( j=0; j<btop; j++)
+       if( done[j]==0) {
+               p= newStmt(mb,basketRef,registerRef);
+               p= pushStr(mb,p, schemas[j]);
+               p= pushStr(mb,p, tables[j]);
+       }
+       p= newStmt(mb, sqlRef, transactionRef);
        for (i = 1; i < limit; i++)
                if (old[i]) {
                        p = old[i];
 
+                       if(getModuleId(p) == sqlRef && getFunctionId(p)== 
transactionRef){
+                               freeInstruction(p);
+                               continue;
+                       }
                        if (getModuleId(p) == sqlRef && getFunctionId(p) == 
tidRef ){
                                
isstream(getVarConstant(mb,getArg(p,2)).val.sval, 
getVarConstant(mb,getArg(p,3)).val.sval );
                                if( fnd){
@@ -106,11 +139,24 @@ OPTiotImplementation(Client cntxt, MalBl
                        }
 
                        if (getModuleId(p) == sqlRef && getFunctionId(p) == 
affectedRowsRef ){
+                               for(j = 0; j < btop; j++){
+                                       r =  newStmt(mb, basketRef, commitRef);
+                                       if (alias[mvc[j]] > 0)
+                                               r =  pushArgument(mb,r, 
alias[mvc[j]]);
+                                       else
+                                               r =  pushArgument(mb,r, mvc[j]);
+                                       r =  pushStr(mb,r, schemas[j]);
+                                       r =  pushStr(mb,r, tables[j]);
+                               }
                                freeInstruction(p);
                                continue;
                        }
 
-                       if (p->token == ENDsymbol && btop > 0) {
+                       if( getModuleId(p)== sqlRef && getFunctionId(p) 
==commitRef)
+                               commit++;
+                       if (p->token == ENDsymbol && btop > 0 && commit == 0) {
+                               commit++;
+                               (void) newStmt(mb, sqlRef, commitRef);
                                /* catch any exception left behind */
                                r = newAssignment(mb);
                                j = getArg(r, 0) = newVariable(mb, 
GDKstrdup("SQLexception"), TYPE_str);
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -225,6 +225,7 @@ str unpinRef;
 str updateRef;
 str subselectRef;
 str timestampRef;
+str transactionRef;
 str thetasubselectRef;
 str likesubselectRef;
 str ilikesubselectRef;
@@ -433,6 +434,7 @@ void optimizerInit(void)
        subsortRef = putName("subsort",7);
        takeRef= putName("take",5);
        timestampRef = putName("timestamp", 9);
+       transactionRef = putName("transaction", 11);
        not_uniqueRef= putName("not_unique",10);
        sampleRef= putName("sample",6);
        subuniqueRef= putName("subunique",9);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -209,6 +209,7 @@ opt_export  str subsumRef;
 opt_export  str subavgRef;
 opt_export  str subsortRef;
 opt_export  str timestampRef;
+opt_export  str transactionRef;
 opt_export  str takeRef;
 opt_export  str not_uniqueRef;
 opt_export  str sampleRef;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -36,7 +36,7 @@
 //#define _DEBUG_BASKET_ if(0)
 #define _DEBUG_BASKET_ 
 
-str statusname[3] = { "<unknown>", "running", "paused" };
+str statusname[4] = { "<unknown>", "running", "paused", "locked" };
 
 BasketRec *baskets;   /* the global iot catalog */
 static int bsktTop = 0, bsktLimit = 0;
@@ -73,7 +73,6 @@ BSKTclean(int idx)
        BBPreclaim(baskets[idx].errors);
        baskets[idx].errors = NULL;
        baskets[idx].count = 0;
-       MT_lock_destroy(&baskets[idx].lock);
 }
 
 // locate the basket in the catalog
@@ -101,21 +100,23 @@ BSKTnewbasket(sql_schema *s, sql_table *
        // Don't introduce the same basket twice
        if( BSKTlocate(s->base.name, t->base.name) > 0)
                return MAL_SUCCEED;
-       //MT_lock_set(&iotLock);
+       MT_lock_set(&iotLock);
        idx = BSKTnewEntry();
-       MT_lock_init(&baskets[idx].lock,"newbasket");
 
        baskets[idx].schema_name = GDKstrdup(s->base.name);
        baskets[idx].table_name = GDKstrdup(t->base.name);
        baskets[idx].seen = * timestamp_nil;
 
+       baskets[idx].status = BSKTPAUSED;
        baskets[idx].count = 0;
        for (o = t->columns.set->h; o; o = o->next){
         sql_column *col = o->data;
         int tpe = col->type.type->localtype;
 
-        if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) )
+        if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) ){
+                       MT_lock_unset(&iotLock);
                        throw(MAL,"baskets.register","Unsupported type %d",tpe);
+               }
                baskets[idx].count++;
        }
        // collect the column names
@@ -136,7 +137,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
 
        baskets[idx].schema = s;
        baskets[idx].table = t;
-       //MT_lock_unset(&iotLock);
+       MT_lock_unset(&iotLock);
        return MAL_SUCCEED;
 }
 
@@ -194,16 +195,17 @@ BSKTactivate(Client cntxt, MalBlkPtr mb,
                /* check for registration */
                idx = BSKTlocate(sch, tbl);
                if( idx == 0)
-                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible\n",sch,tbl);
-               MT_lock_set(&baskets[idx].lock);
-               baskets[idx].status = BSKTRUNNING;
-               MT_lock_unset(&baskets[idx].lock);
+                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible to activate\n",sch,tbl);
+               if( baskets[idx].status == BSKTPAUSED){
+                       MT_lock_set(&iotLock);
+                       baskets[idx].status = BSKTRUNNING;
+                       MT_lock_unset(&iotLock);
+               }
        } else {
-               for( idx =1; idx <bsktTop;  idx++){
-                       MT_lock_set(&baskets[idx].lock);
+               MT_lock_set(&iotLock);
+               for( idx =1; idx <bsktTop;  idx++)
                        baskets[idx].status = BSKTRUNNING;
-                       MT_lock_unset(&baskets[idx].lock);
-               }
+               MT_lock_unset(&iotLock);
        }
        return MAL_SUCCEED;
 }
@@ -223,16 +225,17 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
                /* check for registration */
                idx = BSKTlocate(sch, tbl);
                if( idx == 0)
-                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible\n",sch,tbl);
-               MT_lock_set(&baskets[idx].lock);
-               baskets[idx].status = BSKTPAUSED;
-               MT_lock_unset(&baskets[idx].lock);
+                       throw(SQL,"basket.activate","Stream table %s.%s not 
accessible to deactivate\n",sch,tbl);
+               if( baskets[idx].status == BSKTRUNNING){
+                       MT_lock_set(&iotLock);
+                       baskets[idx].status = BSKTPAUSED;
+                       MT_lock_unset(&iotLock);
+               }
        } else {
-               for( idx =1; idx <bsktTop;  idx++){
-                       MT_lock_set(&baskets[idx].lock);
+               MT_lock_set(&iotLock);
+               for( idx =1; idx <bsktTop;  idx++)
                        baskets[idx].status = BSKTPAUSED;
-                       MT_lock_unset(&baskets[idx].lock);
-               }
+               MT_lock_unset(&iotLock);
        }
        return MAL_SUCCEED;
 }
@@ -279,45 +282,6 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
        throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not 
found",sch,tbl,col);
 }
 
-/*
- * The locks are designated towards the baskets.
- * If you can not grab the lock then we have to wait.
- */
-str BSKTlock(void *ret, str *sch, str *tbl, int *delay)
-{
-       int bskt;
-
-       bskt = BSKTlocate(*sch, *tbl);
-       if (bskt <= 0)
-               throw(SQL, "basket.lock", "Could not find the basket 
%s.%s",*sch,*tbl);
-       _DEBUG_BASKET_ mnstr_printf(BSKTout, "lock group %s.%s\n", *sch, *tbl);
-       MT_lock_set(&baskets[bskt].lock);
-       _DEBUG_BASKET_ mnstr_printf(BSKTout, "got  group locked %s.%s\n", *sch, 
*tbl);
-       (void) delay;  /* control spinlock */
-       (void) ret;
-       return MAL_SUCCEED;
-}
-
-
-str BSKTlock2(void *ret, str *sch, str *tbl)
-{
-       int delay = 0;
-       return BSKTlock(ret, sch, tbl, &delay);
-}
-
-str BSKTunlock(void *ret, str *sch,str *tbl)
-{
-       int bskt;
-
-       (void) ret;
-       bskt = BSKTlocate(*sch,*tbl);
-       if (bskt == 0)
-               throw(SQL, "basket.lock", "Could not find the basket 
%s.%s",*sch,*tbl);
-       MT_lock_unset(&baskets[bskt].lock);
-       return MAL_SUCCEED;
-}
-
-
 str
 BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to