Changeset: 4f1f4ff67d1d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4f1f4ff67d1d
Modified Files:
        sql/backends/monet5/Tests/basket00.stable.out
        sql/backends/monet5/Tests/cquery00.sql
        sql/backends/monet5/cquery.mal
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/include/sql_catalog.h
        sql/scripts/50_cquery.sql
        sql/server/rel_updates.c
Branch: trails
Log Message:

Recovering IOT branch code
Compilation of procedures was missing
and isStream should be part of isTable macro.


diffs (264 lines):

diff --git a/sql/backends/monet5/Tests/basket00.stable.out 
b/sql/backends/monet5/Tests/basket00.stable.out
--- a/sql/backends/monet5/Tests/basket00.stable.out
+++ b/sql/backends/monet5/Tests/basket00.stable.out
@@ -24,11 +24,17 @@ Ready.
 # 13:08:27 >  "mclient" "-lmal" "-ftest" "-Eutf-8" 
"--host=/var/tmp/mtest-16206" "--port=39158"
 # 13:08:27 >  
 
-#--------------------------#
-# t    t       t       t       t       t       t       t       t       t  # 
name
-# void timestamp       str     str     str     int     int     int     int     
str  # type
-#--------------------------#
-#baskets table
+#create stream table s_tmp(i integer);
+#insert into s_tmp values(1),(2);
+[ 2    ]
+#select * from s_tmp;
+% sys.s_tmp # table_name
+% i # name
+% int # type
+% 1 # length
+[ 1    ]
+[ 2    ]
+#drop table s_tmp;
 
 # 13:08:27 >  
 # 13:08:27 >  "Done."
diff --git a/sql/backends/monet5/Tests/cquery00.sql 
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,5 +1,5 @@
 create stream table testing (a int);
-insert into TESTING values(123);
+insert into testing values(123);
 
 create table results (a int);
 
@@ -15,3 +15,6 @@ select * from results;
 
 select * from functions wherE name = 'myfirstcq';
 
+drop procedure myfirstcq;
+drop table results;
+drop table testing;
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -17,9 +17,14 @@
 
 module cquery;
 
+pattern registersql(mod:str, fcn:str)
+address CQprocedure
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will 
analyse
+the MAL block to determine the input/output dependencies and firing 
conditions.";
+
 pattern register(mod:str, fcn:str)
 address CQregister
-comment "Add a continuous MAL plan to the Petri-net scheduler. It will analyse
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will 
analyse
 the MAL block to determine the input/output dependencies and firing 
conditions.";
 
 pattern resume(mod:str, fcn:str)
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -372,7 +372,9 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
        return msg;
 }
 
-// locate the SQL procedure in the catalog
+/* locate the MAL representation of this operation and extract the flow */
+/* If the operation is not available yet, it should be compiled from its
+   definition retained in the SQL catalog */
 static str
 CQprocedureStmt(Client cntxt, MalBlkPtr mb, str schema, str nme)
 {
@@ -424,9 +426,9 @@ CQregisterInternal(Client cntxt, str mod
                return createException(SQL,"cquery.register","Could not find 
SQL procedure");
 
        mb = s->def;
-       msg = CQprocedureStmt(cntxt, mb, modnme, fcnnme);
-       if( msg)
-               return msg;
+       //msg = CQprocedureStmt(cntxt, mb, modnme, fcnnme);
+       //if( msg)
+               //return msg;
        sig = getInstrPtr(mb,0);
 
        if (pnettop == MAXCQ) 
@@ -459,6 +461,81 @@ CQregisterInternal(Client cntxt, str mod
 }
 
 str
+CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch= NULL;
+       str nme= NULL;
+
+       Symbol s = NULL;
+       MalBlkPtr qry;
+       str msg = NULL;
+       InstrPtr p;
+       //Module scope;
+       int i;
+       char name[IDLENGTH];
+
+
+       /* check existing of the pre-compiled and activated function */
+       sch = *getArgReference_str(stk, pci, 1);
+       nme = *getArgReference_str(stk, pci, 2);
+       snprintf(name,IDLENGTH,"%s_%s",sch,nme);
+#ifdef DEBUG_CQUERY
+       fprintf(stderr,"#cq: register the continues procedure 
%s.%s()\n",sch,nme);
+#endif
+
+       /* check existing of the pre-compiled function */
+#ifdef DEBUG_CQUERY
+       fprintf(stderr,"#cq: locate a SQL procedure %s.%s()\n",sch,nme);
+#endif
+       msg = CQprocedureStmt(cntxt, mb, sch, nme);
+       if (msg)
+               return msg;
+       s = findSymbolInModule(cntxt->nspace, putName(nme));
+       if (s == NULL)
+               throw(SQL, "cqeury.procedure", "Definition missing");
+       qry = s->def;
+
+       chkProgram(cntxt->fdout,cntxt->nspace,qry);
+       if( qry->errors)
+               msg = createException(SQL,"cquery.procedure","Error in 
continuous procedure");
+
+#ifdef DEBUG_CQUERY
+       fprintf(stderr,"#cq: register a new continuous query plan\n");
+#endif
+       s = newFunction(userRef, putName(name), FUNCTIONsymbol);
+       if (s == NULL)
+               msg = createException(SQL, "cquery.procedure", "Procedure code 
does not exist.");
+
+       freeMalBlk(s->def);
+       s->def = copyMalBlk(qry);
+       p = getInstrPtr(s->def, 0);
+       setModuleId(p,userRef);
+       setFunctionId(p, putName(name));
+       insertSymbol(cntxt->nspace, s);
+#ifdef DEBUG_CQUERY
+       printFunction(cntxt->fdout, s->def, 0, LIST_MAL_ALL);
+#endif
+       /* optimize the code and register at scheduler */
+       if (msg == MAL_SUCCEED) 
+               addtoMalBlkHistory(mb);
+       if (msg == MAL_SUCCEED) {
+#ifdef DEBUG_CQUERY
+               fprintf(stderr,"#cq: continuous query plan\n");
+#endif
+               msg = CQregisterInternal(cntxt, userRef, putName(name));
+       }
+
+       // register all the baskets mentioned in the plan
+       for( i=1; i< s->def->stop;i++){
+               p= getInstrPtr(s->def,i);
+               if( getModuleId(p) == basketRef && getFunctionId(p)== 
registerRef){
+                       BSKTregister(cntxt,s->def,0,p);
+               }
+       }
+       return msg;
+}
+
+str
 CQregisterMAL(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci )
 {
        int i;
@@ -1005,28 +1082,21 @@ CQscheduler(void *dummy)
 str
 CQstartScheduler(void)
 {
+       MT_Id pid;
        Client cntxt;
-       MT_Id pid;
        stream *fin, *fout;
 
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#Start CQscheduler\n");
 #endif
-       fin =  open_rastream("cquery_in");
-       if( fin == NULL)
-               throw(MAL, "cquery.startScheduler","Could not create input 
file");
-       fout =  open_wastream("cquery_out");
-       if( fout == NULL)
-               throw(MAL, "cquery.startScheduler","Could not create output 
file");
+       fin =  open_rastream("fin_petri_sched");
+       fout =  open_wastream("fout_petri_sched");
        cntxt = MCinitClient(0,bstream_create(fin,0),fout);
        
-/*
-       cntxt = MCinitClient(0,0,0);
-*/
        if( cntxt == NULL)
                throw(MAL, "cquery.startScheduler","Could not initialize 
CQscheduler");
        if( SQLinitClient(cntxt) != MAL_SUCCEED)
-               throw(MAL, "cquery.startScheduler","Could not initialize 
CQscheduler");
+               throw(MAL, "cquery.startScheduler","Could not initialize SQL 
context");
 
        if (pnstatus== CQINIT && MT_create_thread(&pid, CQscheduler, (void*) 
cntxt, MT_THR_JOINABLE) != 0){
 #ifdef DEBUG_CQUERY
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -69,6 +69,7 @@ sql5_export int pnettop;
 sql5_export MT_Lock ttrLock;
 
 sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+sql5_export str CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 sql5_export str CQregisterMAL(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci );
 sql5_export str CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 sql5_export str CQpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -477,7 +477,7 @@ typedef enum table_types {
        tt_replica_table = 6    /* multiple replica of the same table */
 } table_types;
 
-#define isTable(x)       (x->type==tt_table)
+#define isTable(x)       (x->type==tt_table || x->type == tt_stream)
 #define isView(x)        (x->type==tt_view)
 #define isMergeTable(x)   (x->type==tt_merge_table)
 #define isStream(x)      (x->type==tt_stream)
diff --git a/sql/scripts/50_cquery.sql b/sql/scripts/50_cquery.sql
--- a/sql/scripts/50_cquery.sql
+++ b/sql/scripts/50_cquery.sql
@@ -20,7 +20,7 @@
 create schema cquery;
 
 create procedure cquery.register(sch string, cqname string)
-       external name cquery.register;
+       external name cquery.registersql;
 
 create procedure cquery.resume()
        external name cquery.resume;
diff --git a/sql/server/rel_updates.c b/sql/server/rel_updates.c
--- a/sql/server/rel_updates.c
+++ b/sql/server/rel_updates.c
@@ -387,8 +387,8 @@ insert_allowed(mvc *sql, sql_table *t, c
                return sql_error(sql, 02, "%s: cannot %s view '%s'", op, 
opname, tname);
        } else if (isMergeTable(t)) {
                return sql_error(sql, 02, "%s: cannot %s merge table '%s'", op, 
opname, tname);
-       } else if (isStream(t)) {
-               return sql_error(sql, 02, "%s: cannot %s stream '%s'", op, 
opname, tname);
+       //} else if (isStream(t)) {
+               //return sql_error(sql, 02, "%s: cannot %s stream '%s'", op, 
opname, tname);
        } else if (t->access == TABLE_READONLY) {
                return sql_error(sql, 02, "%s: cannot %s read only table '%s'", 
op, opname, tname);
        }
@@ -418,8 +418,8 @@ update_allowed(mvc *sql, sql_table *t, c
                return sql_error(sql, 02, "%s: cannot %s view '%s'", op, 
opname, tname);
        } else if (isMergeTable(t)) {
                return sql_error(sql, 02, "%s: cannot %s merge table '%s'", op, 
opname, tname);
-       } else if (isStream(t)) {
-               return sql_error(sql, 02, "%s: cannot %s stream '%s'", op, 
opname, tname);
+       //} else if (isStream(t)) {
+               //return sql_error(sql, 02, "%s: cannot %s stream '%s'", op, 
opname, tname);
        } else if (t->access == TABLE_READONLY || t->access == 
TABLE_APPENDONLY) {
                return sql_error(sql, 02, "%s: cannot %s read or append only 
table '%s'", op, opname, tname);
        }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to