Changeset: bb0653af7925 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bb0653af7925
Modified Files:
monetdb5/mal/mal_instruction.c
monetdb5/optimizer/opt_cquery.c
monetdb5/optimizer/opt_support.c
sql/backends/monet5/51_basket.mal
sql/backends/monet5/Tests/basket00.malC
sql/backends/monet5/Tests/cqstream00.sql
sql/backends/monet5/sql_basket.c
sql/backends/monet5/sql_basket.h
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/scripts/50_cquery.sql
Branch: timetrails
Log Message:
Synce before cleanup of cquery admin
diffs (truncated from 797 to 300 lines):
diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -151,7 +151,6 @@ resizeMalBlk(MalBlkPtr mb, int elements)
{
int i;
- assert(mb->vsize >= mb->ssize);
if( elements > mb->ssize){
InstrPtr *ostmt = mb->stmt;
mb->stmt = (InstrPtr *) GDKrealloc(mb->stmt, elements *
sizeof(InstrPtr));
diff --git a/monetdb5/optimizer/opt_cquery.c b/monetdb5/optimizer/opt_cquery.c
--- a/monetdb5/optimizer/opt_cquery.c
+++ b/monetdb5/optimizer/opt_cquery.c
@@ -38,6 +38,8 @@
fnd= 1; break;\
}
+#define DEBUG_OPT_CQUERY
+
str
OPTcqueryImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci)
{
@@ -78,11 +80,11 @@ OPTcqueryImplementation(Client cntxt, Ma
for (i = 1; i < limit && btop <MAXBSKT; i++){
p = old[i];
if( getModuleId(p)== basketRef && (getFunctionId(p)==
registerRef || getFunctionId(p)== bindRef ) ){
-#ifdef DEBUG_OPT_CQUERY
- mnstr_printf(cntxt->fdout, "#cquery stream table
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+ mnstr_printf(cntxt->fdout, "#cquery bind stream table
%s.%s\n", schemas[btop], tables[btop]);
+#endif
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[btop])==0 &&
strcmp(tables[j],tables[btop]) ==0)
break;
@@ -91,11 +93,11 @@ OPTcqueryImplementation(Client cntxt, Ma
btop++;
}
if( getModuleId(p)== basketRef && getFunctionId(p) == appendRef
){
-#ifdef DEBUG_OPT_CQUERY
- mnstr_printf(cntxt->fdout, "#cquery stream table
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+ mnstr_printf(cntxt->fdout, "#cquery append stream table
%s.%s\n", schemas[btop], tables[btop]);
+#endif
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[btop])==0 &&
strcmp(tables[j],tables[btop]) ==0)
break;
@@ -105,11 +107,11 @@ OPTcqueryImplementation(Client cntxt, Ma
btop++;
}
if( getModuleId(p)== basketRef && getFunctionId(p) == updateRef
){
-#ifdef DEBUG_OPT_CQUERY
- mnstr_printf(cntxt->fdout, "#cquery stream table
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+ mnstr_printf(cntxt->fdout, "#cquery update stream table
%s.%s\n", schemas[btop], tables[btop]);
+#endif
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[btop])==0 &&
strcmp(tables[j],tables[btop]) ==0)
break;
@@ -119,11 +121,11 @@ OPTcqueryImplementation(Client cntxt, Ma
btop++;
}
if( getModuleId(p)== cqueryRef && getFunctionId(p) ==
basketRef){
-#ifdef DEBUG_OPT_CQUERY
- mnstr_printf(cntxt->fdout, "#cquery stream table
%s.%s\n", getModuleId(p), getFunctionId(p));
-#endif
schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+#ifdef DEBUG_OPT_CQUERY
+ mnstr_printf(cntxt->fdout, "#cquery basket ref
%s.%s\n", schemas[btop],tables[btop]);
+#endif
for( j =0; j< btop ; j++)
if( strcmp(schemas[j], schemas[btop])==0 &&
strcmp(tables[j],tables[btop]) ==0)
break;
@@ -138,13 +140,12 @@ OPTcqueryImplementation(Client cntxt, Ma
lastmvc = getArg(p,1);
}
}
+#ifdef DEBUG_OPT_CQUERY
+ mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams,
mvc %d\n", btop,lastmvc);
+#endif
if( btop == MAXBSKT || btop == 0)
return MAL_SUCCEED;
-#ifdef DEBUG_OPT_CQUERY
- mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams,
mvc %d\n", btop,lastmvc);
- printFunction(cntxt->fdout, mb, stk, LIST_MAL_DEBUG);
-#endif
(void) stk;
alias = (int *) GDKzalloc(mb->vtop * 2 * sizeof(int));
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -446,6 +446,8 @@ hasSideEffects(MalBlkPtr mb, InstrPtr p,
return TRUE;
if ( getModuleId(p) == remoteRef)
return TRUE;
+ if ( getModuleId(p) == cqueryRef)
+ return TRUE;
return FALSE;
}
diff --git a/sql/backends/monet5/51_basket.mal
b/sql/backends/monet5/51_basket.mal
--- a/sql/backends/monet5/51_basket.mal
+++ b/sql/backends/monet5/51_basket.mal
@@ -45,21 +45,17 @@ comment "Remove tuples from a basket";
pattern clear_table(sname:str,tname:str):lng
address mvc_clear_table_wrap;
-pattern setwindow(sch:str, tbl:str, elm:int):int
-address BSKTsetwindow
+pattern window(sch:str, tbl:str, elm:int):int
+address BSKTwindow
comment "Set window size";
-pattern window(mvc:any, sch:str, tbl:str):int
+pattern window(sch:str, tbl:str, elm:int, stride:int):int
address BSKTwindow
-comment "Apply window selection";
+comment "Set window size and stride";
-pattern settumble(sch:str, tbl:str, elm:int):int
-address BSKTsettumble
-comment "Set stride";
-
-pattern tumble(mvc:any, sch:str, tbl:str):int
+unsafe pattern tumble(mvc:any, sch:str, tbl:str):int
address BSKTtumble
-comment "Apply tumbling to the basket";
+comment "Eat away and slide forward";
unsafe pattern lock(mvc:any, sch:str, tbl:str):int
address BSKTlock
@@ -81,7 +77,7 @@ pattern drop(mvc:int,sch:str,tbl:str):in
address BSKTdrop
comment "Remove the basket";
-pattern status()
(seen:bat[:timestamp],sch:bat[:str],tbl:bat[:str],state:bat[:str],
window:bat[:int], stride:bat[:int],events:bat[:int],cycles:bat[:int],
error:bat[:str])
+pattern status() (seen:bat[:timestamp],sch:bat[:str],tbl:bat[:str],
window:bat[:int], stride:bat[:int],events:bat[:int],cycles:bat[:int],
error:bat[:str])
address BSKTstatus
comment "Show the status of the baskets";
diff --git a/sql/backends/monet5/Tests/basket00.malC
b/sql/backends/monet5/Tests/basket00.malC
--- a/sql/backends/monet5/Tests/basket00.malC
+++ b/sql/backends/monet5/Tests/basket00.malC
@@ -4,4 +4,6 @@
io.print(seen,sch,tbl,state,window,stride,events,cycles,error);
+basket.window("sys","tables",2,1);
+
basket.dump();
diff --git a/sql/backends/monet5/Tests/cqstream00.sql
b/sql/backends/monet5/Tests/cqstream00.sql
--- a/sql/backends/monet5/Tests/cqstream00.sql
+++ b/sql/backends/monet5/Tests/cqstream00.sql
@@ -1,21 +1,19 @@
-- Example of a stream splitter
create stream table stmp2 (t timestamp, sensor integer, val decimal(8,2)) ;
+-- SET WINDOW 2 STRIDE 1
+call cquery.window('sys','stmp2',2,1); -- consume 2 tuples and tumble 1 from
this stream
+
create table result1(like stmp2);
create table result2(like stmp2);
-- CREATE CONTINUOUS QUERY cq_splitter
create procedure cq_splitter()
begin
- -- If you use a stream table there should be a tumble option being set
(by default all)
- call cquery.tumble('sys','stmp2',1); -- consume one tuple at a time
insert into result1 select * from stmp2 where val <12;
insert into result2 select * from stmp2 where val >12;
end;
call cquery.register('sys','cq_splitter');
-
--- The stream use in the CQ determines the activation and sets the scheduler
heartbeat to -1
--- If set explictly it overrules the window based bounds
-call cquery.heartbeat(-1);
+select * from cquery.streams();
insert into stmp2 values('2005-09-23 12:34:26.000',1,11.0);
insert into stmp2 values('2005-09-23 12:34:27.000',1,11.0);
@@ -36,6 +34,7 @@ select * from stmp2;
select * from result1;
select * from result2;
+select * from cquery.status();
select * from cquery.log();
-- ideally auto remove upon dropping the procedure
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -34,15 +34,13 @@
#include "mal_builder.h"
#include "opt_prelude.h"
-#define _DEBUG_BASKET_ if(0)
+#define _DEBUG_BASKET_ if(1)
-str statusname[3] = { "<unknown>", "waiting", "filled" };
-
-static BasketRec *baskets; /* the global timetrails catalog */
-static int bsktTop = 0, bsktLimit = 0;
+BasketRec *baskets; /* the global timetrails catalog */
+int bsktTop = 0, bsktLimit = 0;
// locate the basket in the basket catalog
-static int
+int
BSKTlocate(str sch, str tbl)
{
int i;
@@ -125,7 +123,6 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
if( !isStream(t))
throw(MAL,"basket.register","Only allowed for stream tables");
- MT_lock_set(&ttrLock);
idx = BSKTnewEntry();
baskets[idx].schema = GDKstrdup(s->base.name);
@@ -137,10 +134,8 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
sql_column *col = o->data;
int tpe = col->type.type->localtype;
- if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) ){
- MT_lock_unset(&ttrLock);
+ if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) )
throw(MAL,"basket.register","Unsupported type
%d\n",tpe);
- }
colcnt++;
}
if( colcnt == MAXCOLS-1){
@@ -157,12 +152,11 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
baskets[idx].bats[i]= b;
baskets[idx].cols[i++]= GDKstrdup(col->base.name);
}
- MT_lock_unset(&ttrLock);
return MAL_SUCCEED;
}
// MAL/SQL interface for registration of a single table
-static str
+str
BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl)
{
sql_schema *s;
@@ -206,7 +200,7 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
}
str
-BSKTsetwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch = *getArgReference_str(stk,pci,1);
str tbl = *getArgReference_str(stk,pci,2);
@@ -235,28 +229,6 @@ BSKTsetwindow(Client cntxt, MalBlkPtr mb
}
str
-BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- str sch = *getArgReference_str(stk,pci,2);
- str tbl = *getArgReference_str(stk,pci,3);
- int idx;
- str msg;
-
- (void) cntxt;
- (void) mb;
- idx = BSKTlocate(sch, tbl);
- if( idx == 0){
- msg= BSKTregisterInternal(cntxt, mb, sch, tbl);
- if( msg != MAL_SUCCEED)
- return msg;
- idx = BSKTlocate(sch, tbl);
- if( idx ==0)
- throw(SQL,"basket.window","Stream table %s.%s not
accessible\n",sch,tbl);
- }
- throw(MAL,"basket.window","NYI");
-}
-
-str
BSKTkeep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch = *getArgReference_str(stk,pci,1);
@@ -400,9 +372,7 @@ BSKTdrop(Client cntxt, MalBlkPtr mb, Mal
bskt = BSKTlocate(sch,tbl);
if (bskt == 0)
throw(SQL, "basket.drop", "Could not find the basket
%s.%s\n",sch,tbl);
- MT_lock_set(&ttrLock);
BSKTclean(bskt);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list