Changeset: 4f1f4ff67d1d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4f1f4ff67d1d
Modified Files:
sql/backends/monet5/Tests/basket00.stable.out
sql/backends/monet5/Tests/cquery00.sql
sql/backends/monet5/cquery.mal
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/include/sql_catalog.h
sql/scripts/50_cquery.sql
sql/server/rel_updates.c
Branch: trails
Log Message:
Recovering IOT branch code
Compilation of procedures was missing
and isStream should be part of isTable macro.
diffs (264 lines):
diff --git a/sql/backends/monet5/Tests/basket00.stable.out
b/sql/backends/monet5/Tests/basket00.stable.out
--- a/sql/backends/monet5/Tests/basket00.stable.out
+++ b/sql/backends/monet5/Tests/basket00.stable.out
@@ -24,11 +24,17 @@ Ready.
# 13:08:27 > "mclient" "-lmal" "-ftest" "-Eutf-8"
"--host=/var/tmp/mtest-16206" "--port=39158"
# 13:08:27 >
-#--------------------------#
-# t t t t t t t t t t #
name
-# void timestamp str str str int int int int
str # type
-#--------------------------#
-#baskets table
+#create stream table s_tmp(i integer);
+#insert into s_tmp values(1),(2);
+[ 2 ]
+#select * from s_tmp;
+% sys.s_tmp # table_name
+% i # name
+% int # type
+% 1 # length
+[ 1 ]
+[ 2 ]
+#drop table s_tmp;
# 13:08:27 >
# 13:08:27 > "Done."
diff --git a/sql/backends/monet5/Tests/cquery00.sql
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,5 +1,5 @@
create stream table testing (a int);
-insert into TESTING values(123);
+insert into testing values(123);
create table results (a int);
@@ -15,3 +15,6 @@ select * from results;
select * from functions wherE name = 'myfirstcq';
+drop procedure myfirstcq;
+drop table results;
+drop table testing;
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -17,9 +17,14 @@
module cquery;
+pattern registersql(mod:str, fcn:str)
+address CQprocedure
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will
analyse
+the MAL block to determine the input/output dependencies and firing
conditions.";
+
pattern register(mod:str, fcn:str)
address CQregister
-comment "Add a continuous MAL plan to the Petri-net scheduler. It will analyse
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will
analyse
the MAL block to determine the input/output dependencies and firing
conditions.";
pattern resume(mod:str, fcn:str)
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -372,7 +372,9 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
return msg;
}
-// locate the SQL procedure in the catalog
+/* locate the MAL representation of this operation and extract the flow */
+/* If the operation is not available yet, it should be compiled from its
+ definition retained in the SQL catalog */
static str
CQprocedureStmt(Client cntxt, MalBlkPtr mb, str schema, str nme)
{
@@ -424,9 +426,9 @@ CQregisterInternal(Client cntxt, str mod
return createException(SQL,"cquery.register","Could not find
SQL procedure");
mb = s->def;
- msg = CQprocedureStmt(cntxt, mb, modnme, fcnnme);
- if( msg)
- return msg;
+ //msg = CQprocedureStmt(cntxt, mb, modnme, fcnnme);
+ //if( msg)
+ //return msg;
sig = getInstrPtr(mb,0);
if (pnettop == MAXCQ)
@@ -459,6 +461,81 @@ CQregisterInternal(Client cntxt, str mod
}
str
+CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch= NULL;
+ str nme= NULL;
+
+ Symbol s = NULL;
+ MalBlkPtr qry;
+ str msg = NULL;
+ InstrPtr p;
+ //Module scope;
+ int i;
+ char name[IDLENGTH];
+
+
+ /* check existing of the pre-compiled and activated function */
+ sch = *getArgReference_str(stk, pci, 1);
+ nme = *getArgReference_str(stk, pci, 2);
+ snprintf(name,IDLENGTH,"%s_%s",sch,nme);
+#ifdef DEBUG_CQUERY
+ fprintf(stderr,"#cq: register the continues procedure
%s.%s()\n",sch,nme);
+#endif
+
+ /* check existing of the pre-compiled function */
+#ifdef DEBUG_CQUERY
+ fprintf(stderr,"#cq: locate a SQL procedure %s.%s()\n",sch,nme);
+#endif
+ msg = CQprocedureStmt(cntxt, mb, sch, nme);
+ if (msg)
+ return msg;
+ s = findSymbolInModule(cntxt->nspace, putName(nme));
+ if (s == NULL)
+ throw(SQL, "cqeury.procedure", "Definition missing");
+ qry = s->def;
+
+ chkProgram(cntxt->fdout,cntxt->nspace,qry);
+ if( qry->errors)
+ msg = createException(SQL,"cquery.procedure","Error in
continuous procedure");
+
+#ifdef DEBUG_CQUERY
+ fprintf(stderr,"#cq: register a new continuous query plan\n");
+#endif
+ s = newFunction(userRef, putName(name), FUNCTIONsymbol);
+ if (s == NULL)
+ msg = createException(SQL, "cquery.procedure", "Procedure code
does not exist.");
+
+ freeMalBlk(s->def);
+ s->def = copyMalBlk(qry);
+ p = getInstrPtr(s->def, 0);
+ setModuleId(p,userRef);
+ setFunctionId(p, putName(name));
+ insertSymbol(cntxt->nspace, s);
+#ifdef DEBUG_CQUERY
+ printFunction(cntxt->fdout, s->def, 0, LIST_MAL_ALL);
+#endif
+ /* optimize the code and register at scheduler */
+ if (msg == MAL_SUCCEED)
+ addtoMalBlkHistory(mb);
+ if (msg == MAL_SUCCEED) {
+#ifdef DEBUG_CQUERY
+ fprintf(stderr,"#cq: continuous query plan\n");
+#endif
+ msg = CQregisterInternal(cntxt, userRef, putName(name));
+ }
+
+ // register all the baskets mentioned in the plan
+ for( i=1; i< s->def->stop;i++){
+ p= getInstrPtr(s->def,i);
+ if( getModuleId(p) == basketRef && getFunctionId(p)==
registerRef){
+ BSKTregister(cntxt,s->def,0,p);
+ }
+ }
+ return msg;
+}
+
+str
CQregisterMAL(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci )
{
int i;
@@ -1005,28 +1082,21 @@ CQscheduler(void *dummy)
str
CQstartScheduler(void)
{
+ MT_Id pid;
Client cntxt;
- MT_Id pid;
stream *fin, *fout;
#ifdef DEBUG_CQUERY
fprintf(stderr, "#Start CQscheduler\n");
#endif
- fin = open_rastream("cquery_in");
- if( fin == NULL)
- throw(MAL, "cquery.startScheduler","Could not create input
file");
- fout = open_wastream("cquery_out");
- if( fout == NULL)
- throw(MAL, "cquery.startScheduler","Could not create output
file");
+ fin = open_rastream("fin_petri_sched");
+ fout = open_wastream("fout_petri_sched");
cntxt = MCinitClient(0,bstream_create(fin,0),fout);
-/*
- cntxt = MCinitClient(0,0,0);
-*/
if( cntxt == NULL)
throw(MAL, "cquery.startScheduler","Could not initialize
CQscheduler");
if( SQLinitClient(cntxt) != MAL_SUCCEED)
- throw(MAL, "cquery.startScheduler","Could not initialize
CQscheduler");
+ throw(MAL, "cquery.startScheduler","Could not initialize SQL
context");
if (pnstatus== CQINIT && MT_create_thread(&pid, CQscheduler, (void*)
cntxt, MT_THR_JOINABLE) != 0){
#ifdef DEBUG_CQUERY
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -69,6 +69,7 @@ sql5_export int pnettop;
sql5_export MT_Lock ttrLock;
sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+sql5_export str CQprocedure(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
sql5_export str CQregisterMAL(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci );
sql5_export str CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
sql5_export str CQpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -477,7 +477,7 @@ typedef enum table_types {
tt_replica_table = 6 /* multiple replica of the same table */
} table_types;
-#define isTable(x) (x->type==tt_table)
+#define isTable(x) (x->type==tt_table || x->type == tt_stream)
#define isView(x) (x->type==tt_view)
#define isMergeTable(x) (x->type==tt_merge_table)
#define isStream(x) (x->type==tt_stream)
diff --git a/sql/scripts/50_cquery.sql b/sql/scripts/50_cquery.sql
--- a/sql/scripts/50_cquery.sql
+++ b/sql/scripts/50_cquery.sql
@@ -20,7 +20,7 @@
create schema cquery;
create procedure cquery.register(sch string, cqname string)
- external name cquery.register;
+ external name cquery.registersql;
create procedure cquery.resume()
external name cquery.resume;
diff --git a/sql/server/rel_updates.c b/sql/server/rel_updates.c
--- a/sql/server/rel_updates.c
+++ b/sql/server/rel_updates.c
@@ -387,8 +387,8 @@ insert_allowed(mvc *sql, sql_table *t, c
return sql_error(sql, 02, "%s: cannot %s view '%s'", op,
opname, tname);
} else if (isMergeTable(t)) {
return sql_error(sql, 02, "%s: cannot %s merge table '%s'", op,
opname, tname);
- } else if (isStream(t)) {
- return sql_error(sql, 02, "%s: cannot %s stream '%s'", op,
opname, tname);
+ //} else if (isStream(t)) {
+ //return sql_error(sql, 02, "%s: cannot %s stream '%s'", op,
opname, tname);
} else if (t->access == TABLE_READONLY) {
return sql_error(sql, 02, "%s: cannot %s read only table '%s'",
op, opname, tname);
}
@@ -418,8 +418,8 @@ update_allowed(mvc *sql, sql_table *t, c
return sql_error(sql, 02, "%s: cannot %s view '%s'", op,
opname, tname);
} else if (isMergeTable(t)) {
return sql_error(sql, 02, "%s: cannot %s merge table '%s'", op,
opname, tname);
- } else if (isStream(t)) {
- return sql_error(sql, 02, "%s: cannot %s stream '%s'", op,
opname, tname);
+ //} else if (isStream(t)) {
+ //return sql_error(sql, 02, "%s: cannot %s stream '%s'", op,
opname, tname);
} else if (t->access == TABLE_READONLY || t->access ==
TABLE_APPENDONLY) {
return sql_error(sql, 02, "%s: cannot %s read or append only
table '%s'", op, opname, tname);
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list