Changeset: b5cd1b3dadf7 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b5cd1b3dadf7
Modified Files:
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
sql/backends/monet5/cquery.mal
sql/backends/monet5/sql_cat.c
sql/backends/monet5/sql_cat.h
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/backends/monet5/sql_execute.c
sql/backends/monet5/sql_scenario.c
sql/backends/monet5/sql_statement.c
sql/backends/monet5/sqlcatalog.mal
sql/include/sql_catalog.h
sql/server/rel_psm.c
sql/server/sql_mvc.h
sql/server/sql_qc.c
sql/server/sql_qc.h
Branch: trails
Log Message:
Major change. Removed CQ related variables from sql_mvc used during the
compilation process. The use of these variables would cause problems while
compiling UDFs with CQ calls. CQ registering is now properly compiled into a
SQL statement.
diffs (truncated from 707 to 300 lines):
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -262,6 +262,7 @@ str sqlcatalogRef;
str sqlRef;
str startRef;
str starttraceRef;
+str start_cpRef;
str stopRef;
str stoptraceRef;
str streamsRef;
@@ -567,6 +568,7 @@ void optimizerInit(void)
streamsRef = putName("streams");
startRef = putName("start");
starttraceRef = putName("starttrace");
+ start_cpRef = putName("start_cp");
stopRef = putName("stop");
stoptraceRef = putName("stoptrace");
strRef = putName("str");
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -261,6 +261,7 @@ mal_export str sqlcatalogRef;
mal_export str sqlRef;
mal_export str startRef;
mal_export str starttraceRef;
+mal_export str start_cpRef;
mal_export str stopRef;
mal_export str stoptraceRef;
mal_export str streamsRef;
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -17,11 +17,6 @@
module cquery;
-pattern register(mod:str, fcn:str)
-address CQregister
-comment "Add a continuous SQL procedure to the Petri-net scheduler. It will
analyse
-the MAL block to determine the input/output dependencies and firing
conditions.";
-
pattern wait(cnt:int)
address CQwait
comment "Sleep for some time";
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -427,6 +427,12 @@ drop_index(Client cntxt, mvc *sql, char
}
static str
+start_cp(Client cntxt, str alias, int action, lng heartbeat, lng startat, int
cycles, MalBlkPtr fcall)
+{
+ return CQregister(cntxt, alias, action, heartbeat, startat, cycles,
fcall);
+}
+
+static str
change_single_cp(str alias, int action, lng heartbeat, lng startat, int cycles)
{
if(action & mod_resume_continuous) {
@@ -1300,7 +1306,23 @@ SQLdrop_index(Client cntxt, MalBlkPtr mb
msg = drop_index(cntxt, sql, sname, iname);
return msg;
}
-//alias:str, action:int, heartbeats:lng, startat:lng, cycles:int)
+
+str
+SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{ mvc *sql = NULL;
+ str msg;
+ str alias = *getArgReference_str(stk, pci, 1);
+ int action = *getArgReference_int(stk, pci, 2);
+ lng heartbeat = *getArgReference_lng(stk, pci, 3);
+ lng startat = *getArgReference_lng(stk, pci, 4);
+ int cycles = *getArgReference_int(stk, pci, 5);
+ MalBlkPtr fcall = *(MalBlkPtr*) getArgReference(stk, pci, 6);
+
+ initcontext();
+ msg = start_cp(cntxt, alias, action, heartbeat, startat, cycles, fcall);
+ return msg;
+}
+
str
SQLchange_single_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{ mvc *sql = NULL;
diff --git a/sql/backends/monet5/sql_cat.h b/sql/backends/monet5/sql_cat.h
--- a/sql/backends/monet5/sql_cat.h
+++ b/sql/backends/monet5/sql_cat.h
@@ -56,6 +56,7 @@ sql5_export str SQLrename_user(Client cn
sql5_export str SQLcreate_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci) ;
sql5_export str SQLdrop_role(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci) ;
sql5_export str SQLdrop_index(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci) ;
+sql5_export str SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
sql5_export str SQLchange_single_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
sql5_export str SQLchange_all_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
sql5_export str SQLdrop_function(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci) ;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -434,39 +434,14 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
* The actual function is called with the arguments provided in the call.
*/
str
-CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci )
+CQregister(Client cntxt, str alias, int which, lng heartbeats, lng startat,
int cycles, MalBlkPtr mb)
{
- str msg = MAL_SUCCEED, alias = NULL;
- InstrPtr sig = getInstrPtr(mb,0),q;
- MalBlkPtr other;
+ str msg = MAL_SUCCEED;
+ InstrPtr sig, q;
Symbol s;
CQnode *pnew;
- backend *be = (backend *) cntxt->sqlcontext;
- mvc* sqlcontext;
- const char* err_message = "procedure";
- int i, j, is_function = 0, cycles = DEFAULT_CP_CYCLES, idx;
- size_t ttlen = 0;
- lng heartbeats = DEFAULT_CP_HEARTBEAT, startat = 0;
-
- (void) stk;
- (void) pci;
-
- if(be){
- sqlcontext = be->mvc;
- if(sqlcontext->continuous & mod_continuous_function)
- err_message = "function";
- cycles = sqlcontext->cycles;
- startat = sqlcontext->startat;
- heartbeats = sqlcontext->heartbeats;
- is_function = (sqlcontext->continuous &
mod_continuous_function);
- if(sqlcontext->cq_alias) {
- alias = GDKstrdup(sqlcontext->cq_alias);
- if( alias == NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
- goto finish;
- }
- }
- }
+ const char* err_message = (which & mod_continuous_function) ?
"function" : "procedure";
+ int i, j, idx;
if(cycles < 0 && cycles != CYCLES_NIL){
msg = createException(SQL,"cquery.register",SQLSTATE(42000)
"The cycles value must be non negative\n");
@@ -481,7 +456,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
goto finish;
}
- if(is_function){ /* for functions we need to remove the sql.mvc
instruction */
+ if(which & mod_continuous_function){ /* for functions we need to remove
the sql.mvc instruction */
for(i = 1; i< mb->stop; i++){
sig= getInstrPtr(mb,i);
if( getFunctionId(sig) == mvcRef){
@@ -502,14 +477,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M
goto finish;
}
- if(!alias) {
- ttlen = strlen(getFunctionId(sig)) + 1; //plus the null
character
- alias = GDKmalloc(ttlen);
- if( alias == NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
- goto finish;
- }
- snprintf(alias, ttlen, "%s", getFunctionId(sig));
+ if(!alias || strcmp(alias, str_nil) == 0) {
+ alias = GDKstrdup(getFunctionId(sig));
+ } else {
+ alias = GDKstrdup(alias);
+ }
+ if( alias == NULL) {
+ msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ goto finish;
}
#ifdef DEBUG_CQUERY
@@ -522,6 +497,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
if( pnew == NULL) {
msg =
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
GDKfree(alias);
+ freeMalBlk(mb);
goto unlock;
}
pnetLimit = INITIAL_MAXCQ;
@@ -532,6 +508,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
if( pnew == NULL) {
msg =
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
GDKfree(alias);
+ freeMalBlk(mb);
goto unlock;
}
pnetLimit += INITIAL_MAXCQ;
@@ -543,6 +520,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"The continuous %s %s is already registered.\n",
err_message, alias);
GDKfree(alias);
+ //freeMalBlk(mb); do not mess with caches!!
goto unlock;
}
@@ -552,10 +530,12 @@ CQregister(Client cntxt, MalBlkPtr mb, M
msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Cannot find %s %s.%s.\n",
err_message,
getModuleId(sig), getFunctionId(sig));
GDKfree(alias);
+ freeMalBlk(mb);
goto unlock;
}
if((msg = CQanalysis(cntxt, s->def, pnettop)) != MAL_SUCCEED) {
GDKfree(alias);
+ freeMalBlk(mb);
goto unlock;
}
if(heartbeats != HEARTBEAT_NIL) {
@@ -569,37 +549,31 @@ CQregister(Client cntxt, MalBlkPtr mb, M
}
}
- other = copyMalBlk(mb);
- if(other == NULL) {
- msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
- GDKfree(alias);
- goto unlock;
- }
- q = newStmt(other, sqlRef, transactionRef);
+ q = newStmt(mb, sqlRef, transactionRef);
if(q == NULL) {
msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
GDKfree(alias);
- freeMalBlk(other);
+ freeMalBlk(mb);
goto unlock;
}
- setArgType(other,q, 0, TYPE_void);
- moveInstruction(other, getPC(other,q),i);
- q = newStmt(other, sqlRef, commitRef);
+ setArgType(mb,q, 0, TYPE_void);
+ moveInstruction(mb, getPC(mb,q),i);
+ q = newStmt(mb, sqlRef, commitRef);
if(q == NULL) {
msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
GDKfree(alias);
- freeMalBlk(other);
+ freeMalBlk(mb);
goto unlock;
}
- setArgType(other,q, 0, TYPE_void);
- moveInstruction(other, getPC(other,q),i+2);
- chkProgram(cntxt->usermodule, other);
+ setArgType(mb,q, 0, TYPE_void);
+ moveInstruction(mb, getPC(mb,q),i+2);
+ chkProgram(cntxt->usermodule, mb);
pnet[pnettop].mod = GDKstrdup(getModuleId(sig));
if(pnet[pnettop].mod == NULL) {
msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
GDKfree(alias);
- freeMalBlk(other);
+ freeMalBlk(mb);
goto unlock;
}
@@ -607,23 +581,23 @@ CQregister(Client cntxt, MalBlkPtr mb, M
if(pnet[pnettop].fcn == NULL) {
msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
GDKfree(alias);
- freeMalBlk(other);
+ freeMalBlk(mb);
GDKfree(pnet[pnettop].mod);
goto unlock;
}
- pnet[pnettop].stk = prepareMALstack(other, other->vsize);
+ pnet[pnettop].stk = prepareMALstack(mb, mb->vsize);
if(pnet[pnettop].stk == NULL) {
msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
GDKfree(alias);
- freeMalBlk(other);
+ freeMalBlk(mb);
GDKfree(pnet[pnettop].mod);
GDKfree(pnet[pnettop].fcn);
goto unlock;
}
pnet[pnettop].alias = alias;
- pnet[pnettop].mb = other;
+ pnet[pnettop].mb = mb;
pnet[pnettop].cycles = cycles;
pnet[pnettop].beats = SET_HEARTBEATS(heartbeats);
//subtract the beats value so the CQ will start at the precise moment
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -69,8 +69,6 @@ sql5_export int pnetLimit, pnettop;
sql5_export int CQlocateQueryExternal(str modname, str fcnname);
sql5_export int CQlocateBasketExternal(str schname, str tblname);
-sql5_export str CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
-
sql5_export str CQwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list