Changeset: 3e75a249bbe3 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3e75a249bbe3 Modified Files: sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_execute.c Branch: trails Log Message:
Added defensive lines for allocations plus the continuous procedures are properly identified according to their parameters at start diffs (truncated from 546 to 300 lines): diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -70,6 +70,8 @@ static int pnstatus = CQINIT; static int cycleDelay = 200; /* be careful, it affects response/throughput timings */ MT_Lock ttrLock MT_LOCK_INITIALIZER("cqueryLock"); +#define SET_HEARTBEATS(X) X * 1000 /* minimal 1 ms */ + static void CQfree(int idx) { @@ -87,7 +89,7 @@ CQfree(int idx) } /* We need a lock table for all stream tables considered - * It is better to use a slot in the BATdescriptor + * It is better to use a slot in the BATdescriptor * A sanity routine should be available to check for any forgotten lock frees. */ @@ -151,7 +153,7 @@ str CQlog( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ BAT *tickbat = 0, *modbat = 0, *fcnbat = 0, *timebat = 0, *errbat = 0; bat *tickret, *modret, *fcnret, *timeret, *errorret; - + (void) cntxt; (void) mb; @@ -200,7 +202,7 @@ CQstatus( Client cntxt, MalBlkPtr mb, Ma bat *tickret = 0, *modret = 0, *fcnret = 0, *statusret = 0, *errorret = 0, *stmtret = 0; int idx; str msg= MAL_SUCCEED; - + (void) cntxt; (void) mb; @@ -271,6 +273,36 @@ CQlocate(str modname, str fcnname) return i; } +static str +CQlocateMb(MalBlkPtr mb, int* idx, str* res, const char* call) +{ + int i; + InstrPtr sig = getInstrPtr(mb,0); + str mb2str; + + for(i = 1; i< mb->stop; i++){ + sig= getInstrPtr(mb,i); + if( getModuleId(sig) == userRef) + break; + } + assert(i != mb->stop); + mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL); + if(mb2str == NULL) { + throw(MAL,call,MAL_MALLOC_FAIL); + } + + for (i = 0; i < pnettop; i++){ + if (strcmp(pnet[i].stmt, mb2str) == 0) { + GDKfree(mb2str); + *idx = i; + return MAL_SUCCEED; + } + } + *res = mb2str; + *idx = i; + return MAL_SUCCEED; +} + /* capture and remember errors: WARNING no locks in this call yet! */ str CQerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) @@ -285,7 +317,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.error","Continuous procedure %s.%s not accessible\n",sch,fcn); + throw(SQL,"cquery.error","Continuous procedure %s.%s not accessible",sch,fcn); pnet[idx].error = GDKstrdup(error); return MAL_SUCCEED; @@ -304,7 +336,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.show","Continuous procedure %s.%s not accessible\n",sch,fcn); + throw(SQL,"cquery.show","continuous procedure %s.%s not accessible",sch,fcn); printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | LIST_MAL_VALUE | LIST_MAL_MAPI); return MAL_SUCCEED; @@ -328,7 +360,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i sch = getVarConstant(mb, getArg(p,2)).val.sval; tbl = getVarConstant(mb, getArg(p,3)).val.sval; - // find the stream basket definition + // find the stream basket definition bskt = BSKTlocate(sch,tbl); if( bskt == 0){ msg = BSKTregisterInternal(cntxt,mb,sch,tbl); @@ -365,7 +397,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i sch = getVarConstant(mb, getArg(p,1)).val.sval; tbl = getVarConstant(mb, getArg(p,2)).val.sval; - // find the stream basket definition + // find the stream basket definition bskt = BSKTlocate(sch,tbl); if( bskt == 0){ msg = BSKTregisterInternal(cntxt,mb,sch,tbl); @@ -422,7 +454,6 @@ CQprocedureStmt(Client cntxt, MalBlkPtr return createException(SQL,"cquery.register","SQL procedure missing"); } - static str CQregisterInternal(Client cntxt, str modnme, str fcnnme) { @@ -443,7 +474,7 @@ CQregisterInternal(Client cntxt, str mod mb = s->def; sig = getInstrPtr(mb,0); - if (pnettop == MAXCQ) + if (pnettop == MAXCQ) return createException(MAL,"cquery.register","Too many transitions"); #ifdef DEBUG_CQUERY @@ -530,7 +561,7 @@ CQprocedure(Client cntxt, MalBlkPtr mb, printFunction(cntxt->fdout, s->def, 0, LIST_MAL_ALL); #endif /* optimize the code and register at scheduler */ - if (msg == MAL_SUCCEED) + if (msg == MAL_SUCCEED) addtoMalBlkHistory(mb); if (msg == MAL_SUCCEED) { #ifdef DEBUG_CQUERY @@ -581,7 +612,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M break; } if( i == mb->stop){ - msg = createException(SQL,"cquery.register","Can not detect procedure call %s.%s.\n", + msg = createException(SQL,"cquery.register","Cannot detect procedure call %s.%s.\n", getModuleId(sig), getFunctionId(sig)); goto finish; } @@ -599,9 +630,17 @@ CQregister(Client cntxt, MalBlkPtr mb, M goto finish; } q = newStmt(mb, sqlRef, transactionRef); + if(q == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + goto finish; + } setArgType(mb,q, 0, TYPE_void); moveInstruction(mb, getPC(mb,q),i); q = newStmt(mb, sqlRef, commitRef); + if(q == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + goto finish; + } setArgType(mb,q, 0, TYPE_void); moveInstruction(mb, getPC(mb,q),i+2); chkProgram(cntxt->fdout, cntxt->nspace, mb); @@ -611,17 +650,44 @@ CQregister(Client cntxt, MalBlkPtr mb, M #endif MT_lock_set(&ttrLock); pnet[pnettop].mod = GDKstrdup(getModuleId(sig)); + if(pnet[pnettop].mod == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + goto unlock; + } + pnet[pnettop].fcn = GDKstrdup(getFunctionId(sig)); + if(pnet[pnettop].fcn == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + GDKfree(pnet[pnettop].mod); + goto unlock; + } + pnet[pnettop].stmt = instruction2str(mb,stk,sig,LIST_MAL_CALL); + if(pnet[pnettop].stmt == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + GDKfree(pnet[pnettop].mod); + GDKfree(pnet[pnettop].fcn); + goto unlock; + } + pnet[pnettop].mb = mb; + pnet[pnettop].stk = prepareMALstack(mb, mb->vsize); + if(pnet[pnettop].stk == NULL) { + msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL); + GDKfree(pnet[pnettop].mod); + GDKfree(pnet[pnettop].fcn); + GDKfree(pnet[pnettop].stmt); + goto unlock; + } pnet[pnettop].cycles = cycles; - pnet[pnettop].beats = heartbeats * 1000; + pnet[pnettop].beats = SET_HEARTBEATS(heartbeats); pnet[pnettop].run = lng_nil; pnet[pnettop].seen = *timestamp_nil; pnet[pnettop].status = CQWAIT; pnettop++; +unlock: MT_lock_unset(&ttrLock); finish: if(!msg && CQinit == 0) { /* start the scheduler if needed */ @@ -631,14 +697,12 @@ finish: } static str -CQresumeInternal(Client cntxt, str modnme, str fcnnme, int with_alter) +CQresumeInternal(Client cntxt, MalBlkPtr mb, int with_alter) { mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc; - str msg = MAL_SUCCEED; + str msg = MAL_SUCCEED, mb2str; int idx, cycles, heartbeats; - MT_lock_set(&ttrLock); - if(with_alter) { cycles = sqlcontext ? sqlcontext->cycles : int_nil; heartbeats = sqlcontext ? sqlcontext->heartbeats : 1; @@ -652,13 +716,18 @@ CQresumeInternal(Client cntxt, str modnm } } - idx = CQlocate(modnme, fcnnme); + MT_lock_set(&ttrLock); + + if(CQlocateMb(mb, &idx, &mb2str, "cquery.resume") != MAL_SUCCEED) { + goto unlock; + } if( idx == pnettop) { - msg = createException(SQL, "cquery.resume", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); - goto finish; + msg = createException(SQL, "cquery.resume", "continuous procedure %s has not yet started\n", mb2str); + GDKfree(mb2str); + goto unlock; } if( pnet[idx].status != CQPAUSE) - goto finish; + goto unlock; #ifdef DEBUG_CQUERY fprintf(stderr, "#resume scheduler\n"); @@ -666,7 +735,7 @@ CQresumeInternal(Client cntxt, str modnm pnet[idx].status = CQWAIT; if(with_alter) { pnet[idx].cycles = cycles; - pnet[idx].beats = heartbeats * 1000; + pnet[idx].beats = SET_HEARTBEATS(heartbeats); } /* start the scheduler if needed */ @@ -674,8 +743,9 @@ CQresumeInternal(Client cntxt, str modnm msg = CQstartScheduler(); } +unlock: + MT_lock_unset(&ttrLock); finish: - MT_lock_unset(&ttrLock); return msg; } @@ -730,8 +800,8 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal } } if( k >= 0 ) - return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)), 1); - throw(SQL,"cquery.resume","Continuous query not found "); + return CQresumeInternal(cntxt, mb, 1); + throw(SQL,"cquery.resume","continuous procedure %s.%s not found", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); } str @@ -753,8 +823,8 @@ CQresumeNoAlter(Client cntxt, MalBlkPtr } } if( k >= 0 ) - return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)), 0); - throw(SQL,"cquery.resume","Continuous query not found "); + return CQresumeInternal(cntxt, mb, 0); + throw(SQL,"cquery.resume","continuous procedure %s.%s not found", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k))); } static str @@ -769,21 +839,24 @@ CQpauseInternalRanges(int first, int las } static str -CQpauseInternal(str modnme, str fcnnme) +CQpauseInternal(MalBlkPtr mb) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list