Changeset: 8204adf3f30e for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8204adf3f30e
Modified Files:
sql/backends/monet5/Tests/cquery16.sql
sql/backends/monet5/Tests/cquery16.stable.err
sql/backends/monet5/sql_cat.c
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/backends/monet5/sql_statement.c
sql/backends/monet5/sqlcatalog.mal
sql/include/sql_catalog.h
sql/include/sql_relation.h
sql/server/rel_psm.c
Branch: trails
Log Message:
Compile the UDF and setup parameters just before registering in the Petri-net.
diffs (truncated from 685 to 300 lines):
diff --git a/sql/backends/monet5/Tests/cquery16.sql
b/sql/backends/monet5/Tests/cquery16.sql
--- a/sql/backends/monet5/Tests/cquery16.sql
+++ b/sql/backends/monet5/Tests/cquery16.sql
@@ -4,11 +4,6 @@ create table results16 (a int);
start continuous sys.cq_query16a(); --error
-create procedure cq_query16b() --error
-begin
- start continuous sys.cq_query16a();
-end;
-
create procedure cq_query16a()
begin
insert into results16 (select * from testing16);
diff --git a/sql/backends/monet5/Tests/cquery16.stable.err
b/sql/backends/monet5/Tests/cquery16.stable.err
--- a/sql/backends/monet5/Tests/cquery16.stable.err
+++ b/sql/backends/monet5/Tests/cquery16.stable.err
@@ -31,15 +31,8 @@ stderr of test 'cquery16` in directory '
MAPI = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794
QUERY = start continuous sys.cq_query16a(); --error
-ERROR = !SELECT: no such operator 'cq_query16a'
-CODE = 42000
-MAPI = (monetdb) /var/tmp/mtest-25778/.s.monetdb.37794
-QUERY = create procedure cq_query16b() --error
- begin
- start continuous sys.cq_query16a();
- end;
-ERROR = !SELECT: no such operator 'cq_query16a'
-CODE = 42000
+ERROR = !Failed to bind procedure sys.cq_query16a
+CODE = 3F000
# 15:37:38 >
# 15:37:38 > "Done."
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -427,12 +427,6 @@ drop_index(Client cntxt, mvc *sql, char
}
static str
-start_cp(Client cntxt, str alias, int action, lng heartbeat, lng startat, int
cycles, MalBlkPtr fcall)
-{
- return CQregister(cntxt, alias, action, heartbeat, startat, cycles,
fcall);
-}
-
-static str
change_single_cp(str alias, int action, lng heartbeat, lng startat, int cycles)
{
if(action & mod_resume_continuous) {
@@ -1312,15 +1306,18 @@ str
SQLstart_cp(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{ mvc *sql = NULL;
str msg;
- str alias = *getArgReference_str(stk, pci, 1);
- int action = *getArgReference_int(stk, pci, 2);
- lng heartbeat = *getArgReference_lng(stk, pci, 3);
- lng startat = *getArgReference_lng(stk, pci, 4);
- int cycles = *getArgReference_int(stk, pci, 5);
- MalBlkPtr fcall = *(MalBlkPtr*) getArgReference(stk, pci, 6);
+ str sname = *getArgReference_str(stk, pci, 1);
+ str fname = *getArgReference_str(stk, pci, 2);
+ int argc = *getArgReference_int(stk, pci, 3);
+ atom **args = *(atom ***) getArgReference(stk, pci, 4);
+ str alias = *getArgReference_str(stk, pci, 5);
+ int action = *getArgReference_int(stk, pci, 6);
+ lng heartbeat = *getArgReference_lng(stk, pci, 7);
+ lng startat = *getArgReference_lng(stk, pci, 8);
+ int cycles = *getArgReference_int(stk, pci, 9);
initcontext();
- msg = start_cp(cntxt, alias, action, heartbeat, startat, cycles, fcall);
+ msg = CQregister(cntxt, sname, fname, argc, args, alias, action,
heartbeat, startat, cycles);
return msg;
}
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -58,6 +58,7 @@ static int pnstatus = CQINIT;
static int cycleDelay = 200; /* be careful, it affects response/throughput
timings */
static MT_Lock ttrLock;
static MT_Id cq_pid = 0;
+static int CQ_counter = 0;
static BAT *CQ_id_tick = 0;
static BAT *CQ_id_mod = 0;
@@ -429,19 +430,37 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
return msg;
}
+#define FREE_CQ_MB(X)
\
+ msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL); \
+ if(mb)
\
+ freeMalBlk(mb);
\
+ if(ralias)
\
+ GDKfree(ralias);
\
+ goto X;
+
/* Every SQL statement is wrapped with a caller function that
* regulates transaction bounds, debugger
* The actual function is called with the arguments provided in the call.
*/
str
-CQregister(Client cntxt, str alias, int which, lng heartbeats, lng startat,
int cycles, MalBlkPtr mb)
+CQregister(Client cntxt, str sname, str fname, int argc, atom **args, str
alias, int which, lng heartbeats, lng startat, int cycles)
{
- str msg = MAL_SUCCEED;
- InstrPtr sig = NULL, q;
- Symbol s;
+ str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL;
+ InstrPtr p = NULL, q = NULL;
+ Symbol sym;
CQnode *pnew;
+ MalBlkPtr mb = NULL, prev;
const char* err_message = (which & mod_continuous_function) ?
"function" : "procedure";
- int i, j, idx;
+ char* cq_id = NULL;
+ int i, idx, varid;
+ char buffer[IDLENGTH];
+ backend* be = (backend*) cntxt->sqlcontext;
+ mvc *m = be->mvc;
+ sql_schema *s;
+ sql_subfunc *f;
+ list *l;
+
+ prev = be->mb;
if(cycles < 0 && cycles != CYCLES_NIL){
msg = createException(SQL,"cquery.register",SQLSTATE(42000)
"The cycles value must be non negative\n");
@@ -456,34 +475,132 @@ CQregister(Client cntxt, str alias, int
goto finish;
}
- if(which & mod_continuous_function){ /* for functions we need to remove
the sql.mvc instruction */
- for(i = 1; i< mb->stop; i++){
- sig= getInstrPtr(mb,i);
- if( getFunctionId(sig) == mvcRef){
- removeInstruction(mb, sig);
- }
+ if (!m->sa) {
+ if((m->sa = sa_create()) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ }
+ if (!be->mb) {
+ if((be->mb = newMalBlk(8)) == NULL) {
+ FREE_CQ_MB(finish)
}
}
- /* extract the actual procedure/function call and check for duplicate */
- for(i = 1; i< mb->stop; i++){
- sig= getInstrPtr(mb,i);
- if( getModuleId(sig) == userRef)
- break;
- }
- if( i == mb->stop){
- msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Cannot detect %s call %s.%s.\n",
- err_message,
getModuleId(sig), getFunctionId(sig));
+ rschema = (sname == NULL || strcmp(sname, str_nil) == 0) ?
m->session->schema_name : sname;
+ if((s = mvc_bind_schema(m, rschema)) == NULL) { //bind the schema
+ msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Failed to bind schema %s\n", rschema);
goto finish;
}
- if(!alias || strcmp(alias, str_nil) == 0) {
- alias = GDKstrdup(getFunctionId(sig));
+ if((l = list_create(NULL)) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ for (i = 0; i < argc; i++) { //prepare the arguments for the backend
creation
+ atom *a = args[i];
+ list_append(l, stmt_varnr(be, i, &a->tpe));
+ }
+ if(argc)
+ f = sql_find_func(m->sa, s, fname, argc, (which &
mod_continuous_function) ? F_FUNC : F_PROC, NULL); //bind the UDF
+ else
+ f = sql_bind_func_(m->sa, s, fname, l, (which &
mod_continuous_function) ? F_FUNC : F_PROC);
+ if(!f) {
+ msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Failed to bind %s %s.%s\n", err_message, sname, fname);
+ GDKfree(ralias);
+ list_destroy(l);
+ goto finish;
+ }
+ if (backend_create_subfunc(be, f, l) < 0) { //create the backend
function
+ msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Failed to generate backend function\n");
+ GDKfree(ralias);
+ list_destroy(l);
+ goto finish;
+ }
+ list_destroy(l);
+
+ (void) snprintf(buffer, sizeof(buffer), "cq_%d", ++CQ_counter); //set
the CQ ID
+ if((cq_id = GDKstrdup(buffer)) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
+ GDKfree(cq_id);
+ FREE_CQ_MB(finish)
+ }
+ if((p = newInstruction(NULL, "user", cq_id)) == NULL) {
+ GDKfree(cq_id);
+ FREE_CQ_MB(finish)
+ }
+ p->token = FUNCTIONsymbol;
+ p->barrier = 0;
+ varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_any);
+ setDestVar(p, varid);
+ pushInstruction(mb, p);
+
+ for (i = 0; i < argc; i++) { //add variables to the MAL block
+ atom *a = args[i];
+ int type = atom_type(a)->type->localtype;
+ varid = 0;
+
+ (void) snprintf(buffer, sizeof(buffer), "A%d", i);
+ a->varid = varid = newVariable(mb, buffer, strlen(buffer),
type);
+ if (varid < 0) {
+ FREE_CQ_MB(finish)
+ }
+ if ((p = pushArgument(mb, p, varid)) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ setVarType(mb, varid, type);
+ setVarUDFtype(mb, 0);
+ }
+ for (i = 0; i < argc; i++) { //add assignments for arguments
+ p = newAssignment(mb);
+ if (p && args[i]->varid >= 0) {
+ p = pushArgument(mb, p, args[i]->varid);
+ } else if(p) {
+ (void) snprintf(buffer, sizeof(buffer), "A%d", i);
+ p = pushArgumentId(mb, p, buffer);
+ }
+ if (p == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ }
+ if ((p = newStmt(mb, "user", fname)) == NULL) { //add the UDF call
statement
+ FREE_CQ_MB(finish)
+ }
+ /*if (f->res && list_length(f->res)) {
+ sql_subtype *res = f->res->h->data;
+ setVarType(mb, getArg(q, 0), res->type->localtype);
+ setVarUDFtype(mb, getArg(q, 0));
+ }*/
+ for (i = 0; i < argc; i++) { //add arguments assignments
+ if ((p = pushArgument(mb, p, i + 2)) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ }
+ for (i = 0; i < argc; i++) { //initialize arguments assignments
+ atom *arg = args[i];
+ ValPtr val = (ValPtr) &arg->data;
+ if (VALcopy(&mb->var[i + 2].value, val) == NULL) {
+ FREE_CQ_MB(finish)
+ }
+ setVarConstant(mb, i + 2);
+ setVarFixed(mb, i + 2);
+ }
+
+ if(!alias || strcmp(alias, str_nil) == 0) { //set the alias
+ if((ralias = GDKstrdup(fname)) == NULL) {
+ FREE_CQ_MB(finish)
+ }
} else {
- alias = GDKstrdup(alias);
+ ralias = GDKstrdup(alias);
+ }
+ if(ralias == NULL) {
+ FREE_CQ_MB(finish)
}
- if( alias == NULL) {
- msg = createException(SQL,"cquery.register",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ if ((sym = findSymbol(cntxt->usermodule, "user", fname)) == NULL){ //
access the actual procedure body
+ msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Cannot find %s user.%s.\n", err_message, fname);
+ GDKfree(cq_id);
+ GDKfree(ralias);
+ freeMalBlk(mb);
goto finish;
}
@@ -495,10 +612,7 @@ CQregister(Client cntxt, str alias, int
if( pnet == 0){
pnew = (CQnode *) GDKzalloc((INITIAL_MAXCQ) * sizeof(CQnode));
if( pnew == NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(HY001) MAL_MALLOC_FAIL);
- GDKfree(alias);
- freeMalBlk(mb);
- goto unlock;
+ FREE_CQ_MB(unlock)
}
pnetLimit = INITIAL_MAXCQ;
pnet = pnew;
@@ -506,97 +620,66 @@ CQregister(Client cntxt, str alias, int
if (pnettop == pnetLimit) {
pnew = (CQnode *) GDKrealloc(pnet, (pnetLimit+INITIAL_MAXCQ) *
sizeof(CQnode));
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list