Changeset: bb0653af7925 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bb0653af7925
Modified Files:
        monetdb5/mal/mal_instruction.c
        monetdb5/optimizer/opt_cquery.c
        monetdb5/optimizer/opt_support.c
        sql/backends/monet5/51_basket.mal
        sql/backends/monet5/Tests/basket00.malC
        sql/backends/monet5/Tests/cqstream00.sql
        sql/backends/monet5/sql_basket.c
        sql/backends/monet5/sql_basket.h
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/scripts/50_cquery.sql
Branch: timetrails
Log Message:

Synce before cleanup of cquery admin


diffs (truncated from 797 to 300 lines):

diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -151,7 +151,6 @@ resizeMalBlk(MalBlkPtr mb, int elements)
 {
        int i;
 
-       assert(mb->vsize >= mb->ssize);
        if( elements > mb->ssize){
                InstrPtr *ostmt = mb->stmt;
                mb->stmt = (InstrPtr *) GDKrealloc(mb->stmt, elements * 
sizeof(InstrPtr));
diff --git a/monetdb5/optimizer/opt_cquery.c b/monetdb5/optimizer/opt_cquery.c
--- a/monetdb5/optimizer/opt_cquery.c
+++ b/monetdb5/optimizer/opt_cquery.c
@@ -38,6 +38,8 @@
                fnd= 1; break;\
        }
 
+#define DEBUG_OPT_CQUERY
+
 str
 OPTcqueryImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci)
 {
@@ -78,11 +80,11 @@ OPTcqueryImplementation(Client cntxt, Ma
        for (i = 1; i < limit && btop <MAXBSKT; i++){
                p = old[i];
                if( getModuleId(p)== basketRef && (getFunctionId(p)== 
registerRef || getFunctionId(p)== bindRef )  ){
-#ifdef DEBUG_OPT_CQUERY
-                       mnstr_printf(cntxt->fdout, "#cquery stream table 
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
                        schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+                       mnstr_printf(cntxt->fdout, "#cquery bind stream table 
%s.%s\n", schemas[btop], tables[btop]);
+#endif
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[btop])==0  && 
strcmp(tables[j],tables[btop]) ==0)
                                break;
@@ -91,11 +93,11 @@ OPTcqueryImplementation(Client cntxt, Ma
                                btop++;
                }
                if( getModuleId(p)== basketRef && getFunctionId(p) == appendRef 
){
-#ifdef DEBUG_OPT_CQUERY
-                       mnstr_printf(cntxt->fdout, "#cquery stream table 
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
                        schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+                       mnstr_printf(cntxt->fdout, "#cquery append stream table 
%s.%s\n", schemas[btop], tables[btop]);
+#endif
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[btop])==0  && 
strcmp(tables[j],tables[btop]) ==0)
                                break;
@@ -105,11 +107,11 @@ OPTcqueryImplementation(Client cntxt, Ma
                                btop++;
                }
                if( getModuleId(p)== basketRef && getFunctionId(p) == updateRef 
){
-#ifdef DEBUG_OPT_CQUERY
-                       mnstr_printf(cntxt->fdout, "#cquery stream table 
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
                        schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+                       mnstr_printf(cntxt->fdout, "#cquery update stream table 
%s.%s\n", schemas[btop], tables[btop]);
+#endif
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[btop])==0  && 
strcmp(tables[j],tables[btop]) ==0)
                                break;
@@ -119,11 +121,11 @@ OPTcqueryImplementation(Client cntxt, Ma
                                btop++;
                }
                if( getModuleId(p)== cqueryRef && getFunctionId(p) == 
basketRef){
-#ifdef DEBUG_OPT_CQUERY
-                       mnstr_printf(cntxt->fdout, "#cquery stream table 
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
                        schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
                        tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+                       mnstr_printf(cntxt->fdout, "#cquery basket ref 
%s.%s\n", schemas[btop],tables[btop]);
+#endif
                        for( j =0; j< btop ; j++)
                        if( strcmp(schemas[j], schemas[btop])==0  && 
strcmp(tables[j],tables[btop]) ==0)
                                break;
@@ -138,13 +140,12 @@ OPTcqueryImplementation(Client cntxt, Ma
                        lastmvc = getArg(p,1);
                }
        }
+#ifdef DEBUG_OPT_CQUERY
+       mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams, 
mvc %d\n", btop,lastmvc);
+#endif
        if( btop == MAXBSKT || btop == 0)
                return MAL_SUCCEED;
 
-#ifdef DEBUG_OPT_CQUERY
-       mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams, 
mvc %d\n", btop,lastmvc);
-       printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
-#endif
        (void) stk;
 
        alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -446,6 +446,8 @@ hasSideEffects(MalBlkPtr mb, InstrPtr p,
                return TRUE;
        if ( getModuleId(p) == remoteRef)
                return TRUE;
+       if ( getModuleId(p) == cqueryRef)
+               return TRUE;
        return FALSE;
 }
 
diff --git a/sql/backends/monet5/51_basket.mal 
b/sql/backends/monet5/51_basket.mal
--- a/sql/backends/monet5/51_basket.mal
+++ b/sql/backends/monet5/51_basket.mal
@@ -45,21 +45,17 @@ comment "Remove tuples from a basket";
 pattern clear_table(sname:str,tname:str):lng
 address mvc_clear_table_wrap;
 
-pattern setwindow(sch:str, tbl:str, elm:int):int
-address BSKTsetwindow
+pattern window(sch:str, tbl:str, elm:int):int
+address BSKTwindow
 comment "Set window size";
 
-pattern window(mvc:any, sch:str, tbl:str):int
+pattern window(sch:str, tbl:str, elm:int, stride:int):int
 address BSKTwindow
-comment "Apply window selection";
+comment "Set window size and stride";
 
-pattern settumble(sch:str, tbl:str, elm:int):int
-address BSKTsettumble
-comment "Set stride";
-
-pattern tumble(mvc:any, sch:str, tbl:str):int
+unsafe pattern tumble(mvc:any, sch:str, tbl:str):int
 address BSKTtumble
-comment "Apply tumbling to the basket";
+comment "Eat away and slide forward";
 
 unsafe pattern lock(mvc:any, sch:str, tbl:str):int
 address BSKTlock
@@ -81,7 +77,7 @@ pattern drop(mvc:int,sch:str,tbl:str):in
 address BSKTdrop
 comment "Remove the basket";
 
-pattern status() 
(seen:bat[:timestamp],sch:bat[:str],tbl:bat[:str],state:bat[:str], 
window:bat[:int], stride:bat[:int],events:bat[:int],cycles:bat[:int], 
error:bat[:str])
+pattern status() (seen:bat[:timestamp],sch:bat[:str],tbl:bat[:str], 
window:bat[:int], stride:bat[:int],events:bat[:int],cycles:bat[:int], 
error:bat[:str])
 address BSKTstatus
 comment "Show the status of the baskets";
 
diff --git a/sql/backends/monet5/Tests/basket00.malC 
b/sql/backends/monet5/Tests/basket00.malC
--- a/sql/backends/monet5/Tests/basket00.malC
+++ b/sql/backends/monet5/Tests/basket00.malC
@@ -4,4 +4,6 @@
 
 io.print(seen,sch,tbl,state,window,stride,events,cycles,error);
 
+basket.window("sys","tables",2,1);
+
 basket.dump();
diff --git a/sql/backends/monet5/Tests/cqstream00.sql 
b/sql/backends/monet5/Tests/cqstream00.sql
--- a/sql/backends/monet5/Tests/cqstream00.sql
+++ b/sql/backends/monet5/Tests/cqstream00.sql
@@ -1,21 +1,19 @@
 -- Example of a stream splitter
 create stream table stmp2 (t timestamp, sensor integer, val decimal(8,2)) ;
+-- SET WINDOW 2 STRIDE 1
+call cquery.window('sys','stmp2',2,1); -- consume 2 tuples and tumble 1 from 
this stream
+
 create table result1(like stmp2);
 create table result2(like stmp2);
 
 -- CREATE CONTINUOUS QUERY cq_splitter
 create procedure cq_splitter()
 begin
-       -- If you use a stream table there should be a tumble option being set 
(by default all)
-    call cquery.tumble('sys','stmp2',1); -- consume one tuple at a time
     insert into result1 select * from stmp2 where val <12;
     insert into result2 select * from stmp2 where val >12;
 end;
 call cquery.register('sys','cq_splitter');
-
--- The stream use in the CQ determines the activation and sets the scheduler 
heartbeat to -1
--- If set explictly it overrules the window based bounds
-call cquery.heartbeat(-1);
+select * from cquery.streams();
 
 insert into stmp2 values('2005-09-23 12:34:26.000',1,11.0);
 insert into stmp2 values('2005-09-23 12:34:27.000',1,11.0);
@@ -36,6 +34,7 @@ select * from stmp2;
 select * from result1;
 select * from result2;
 
+select * from cquery.status();
 select * from cquery.log();
 
 -- ideally auto remove upon dropping the procedure
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -34,15 +34,13 @@
 #include "mal_builder.h"
 #include "opt_prelude.h"
 
-#define _DEBUG_BASKET_ if(0)
+#define _DEBUG_BASKET_ if(1)
 
-str statusname[3] = { "<unknown>", "waiting", "filled" };
-
-static BasketRec *baskets;   /* the global timetrails catalog */
-static int bsktTop = 0, bsktLimit = 0;
+BasketRec *baskets;   /* the global timetrails catalog */
+int bsktTop = 0, bsktLimit = 0;
 
 // locate the basket in the basket catalog
-static int
+int
 BSKTlocate(str sch, str tbl)
 {
        int i;
@@ -125,7 +123,6 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
        if( !isStream(t))
                throw(MAL,"basket.register","Only allowed for stream tables");
 
-       MT_lock_set(&ttrLock);
        idx = BSKTnewEntry();
 
        baskets[idx].schema = GDKstrdup(s->base.name);
@@ -137,10 +134,8 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
         sql_column *col = o->data;
         int tpe = col->type.type->localtype;
 
-        if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) ){
-                       MT_lock_unset(&ttrLock);
+        if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) )
                        throw(MAL,"basket.register","Unsupported type 
%d\n",tpe);
-               }
                colcnt++;
        }
        if( colcnt == MAXCOLS-1){
@@ -157,12 +152,11 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
                baskets[idx].bats[i]= b;
                baskets[idx].cols[i++]=  GDKstrdup(col->base.name);
        }
-       MT_lock_unset(&ttrLock);
        return MAL_SUCCEED;
 }
 
 // MAL/SQL interface for registration of a single table
-static str
+str
 BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl)
 {
        sql_schema  *s;
@@ -206,7 +200,7 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
 }
 
 str
-BSKTsetwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str sch = *getArgReference_str(stk,pci,1);
        str tbl = *getArgReference_str(stk,pci,2);
@@ -235,28 +229,6 @@ BSKTsetwindow(Client cntxt, MalBlkPtr mb
 }
 
 str
-BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       str sch = *getArgReference_str(stk,pci,2);
-       str tbl = *getArgReference_str(stk,pci,3);
-       int idx;
-       str msg;
-
-       (void) cntxt;
-       (void) mb;
-       idx = BSKTlocate(sch, tbl);
-       if( idx == 0){
-               msg= BSKTregisterInternal(cntxt, mb, sch, tbl);
-               if( msg != MAL_SUCCEED)
-                       return msg;
-               idx = BSKTlocate(sch, tbl);
-               if( idx ==0)
-                       throw(SQL,"basket.window","Stream table %s.%s not 
accessible\n",sch,tbl);
-       }
-       throw(MAL,"basket.window","NYI");
-}
-
-str
 BSKTkeep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str sch = *getArgReference_str(stk,pci,1);
@@ -400,9 +372,7 @@ BSKTdrop(Client cntxt, MalBlkPtr mb, Mal
        bskt = BSKTlocate(sch,tbl);
        if (bskt == 0)
                throw(SQL, "basket.drop", "Could not find the basket 
%s.%s\n",sch,tbl);
-       MT_lock_set(&ttrLock);
        BSKTclean(bskt);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to