Changeset: 3e75a249bbe3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3e75a249bbe3
Modified Files:
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_execute.c
Branch: trails
Log Message:

Added defensive lines for allocations plus the continuous procedures are 
properly identified according to their parameters at start


diffs (truncated from 546 to 300 lines):

diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -70,6 +70,8 @@ static int pnstatus = CQINIT;
 static int cycleDelay = 200; /* be careful, it affects response/throughput 
timings */
 MT_Lock ttrLock MT_LOCK_INITIALIZER("cqueryLock");
 
+#define SET_HEARTBEATS(X) X * 1000 /* minimal 1 ms */
+
 static void
 CQfree(int idx)
 {
@@ -87,7 +89,7 @@ CQfree(int idx)
 }
 
 /* We need a lock table for all stream tables considered
- * It is better to use a slot in the BATdescriptor 
+ * It is better to use a slot in the BATdescriptor
  * A sanity routine should be available to check for any forgotten lock frees.
  */
 
@@ -151,7 +153,7 @@ str
 CQlog( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
        BAT *tickbat = 0, *modbat = 0, *fcnbat = 0, *timebat = 0, *errbat = 0;
        bat *tickret, *modret, *fcnret, *timeret, *errorret;
-       
+
        (void) cntxt;
        (void) mb;
 
@@ -200,7 +202,7 @@ CQstatus( Client cntxt, MalBlkPtr mb, Ma
        bat *tickret = 0, *modret = 0, *fcnret = 0, *statusret = 0, *errorret = 
0, *stmtret = 0;
        int idx;
        str msg= MAL_SUCCEED;
-       
+
        (void) cntxt;
        (void) mb;
 
@@ -271,6 +273,36 @@ CQlocate(str modname, str fcnname)
        return i;
 }
 
+static str
+CQlocateMb(MalBlkPtr mb, int* idx, str* res, const char* call)
+{
+       int i;
+       InstrPtr sig = getInstrPtr(mb,0);
+       str mb2str;
+
+       for(i = 1; i< mb->stop; i++){
+               sig= getInstrPtr(mb,i);
+               if( getModuleId(sig) == userRef)
+                       break;
+       }
+       assert(i != mb->stop);
+       mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL);
+       if(mb2str == NULL) {
+               throw(MAL,call,MAL_MALLOC_FAIL);
+       }
+
+       for (i = 0; i < pnettop; i++){
+               if (strcmp(pnet[i].stmt, mb2str) == 0) {
+                       GDKfree(mb2str);
+                       *idx = i;
+                       return MAL_SUCCEED;
+               }
+       }
+       *res = mb2str;
+       *idx = i;
+       return MAL_SUCCEED;
+}
+
 /* capture and remember errors: WARNING no locks in this call yet! */
 str
 CQerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
@@ -285,7 +317,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.error","Continuous procedure %s.%s not 
accessible\n",sch,fcn);
+               throw(SQL,"cquery.error","Continuous procedure %s.%s not 
accessible",sch,fcn);
 
        pnet[idx].error = GDKstrdup(error);
        return MAL_SUCCEED;
@@ -304,7 +336,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.show","Continuous procedure %s.%s not 
accessible\n",sch,fcn);
+               throw(SQL,"cquery.show","continuous procedure %s.%s not 
accessible",sch,fcn);
 
        printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | 
LIST_MAL_VALUE  | LIST_MAL_MAPI);
        return MAL_SUCCEED;
@@ -328,7 +360,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                        sch = getVarConstant(mb, getArg(p,2)).val.sval;
                        tbl = getVarConstant(mb, getArg(p,3)).val.sval;
 
-                       // find the stream basket definition 
+                       // find the stream basket definition
                        bskt = BSKTlocate(sch,tbl);
                        if( bskt == 0){
                                msg = BSKTregisterInternal(cntxt,mb,sch,tbl);
@@ -365,7 +397,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                        sch = getVarConstant(mb, getArg(p,1)).val.sval;
                        tbl = getVarConstant(mb, getArg(p,2)).val.sval;
 
-                       // find the stream basket definition 
+                       // find the stream basket definition
                        bskt = BSKTlocate(sch,tbl);
                        if( bskt == 0){
                                msg = BSKTregisterInternal(cntxt,mb,sch,tbl);
@@ -422,7 +454,6 @@ CQprocedureStmt(Client cntxt, MalBlkPtr 
        return createException(SQL,"cquery.register","SQL procedure missing");
 }
 
-
 static str
 CQregisterInternal(Client cntxt, str modnme, str fcnnme)
 {
@@ -443,7 +474,7 @@ CQregisterInternal(Client cntxt, str mod
        mb = s->def;
        sig = getInstrPtr(mb,0);
 
-       if (pnettop == MAXCQ) 
+       if (pnettop == MAXCQ)
                return createException(MAL,"cquery.register","Too many 
transitions");
 
 #ifdef DEBUG_CQUERY
@@ -530,7 +561,7 @@ CQprocedure(Client cntxt, MalBlkPtr mb, 
        printFunction(cntxt->fdout, s->def, 0, LIST_MAL_ALL);
 #endif
        /* optimize the code and register at scheduler */
-       if (msg == MAL_SUCCEED) 
+       if (msg == MAL_SUCCEED)
                addtoMalBlkHistory(mb);
        if (msg == MAL_SUCCEED) {
 #ifdef DEBUG_CQUERY
@@ -581,7 +612,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
                        break;
        }
        if( i == mb->stop){
-               msg = createException(SQL,"cquery.register","Can not detect 
procedure call %s.%s.\n",
+               msg = createException(SQL,"cquery.register","Cannot detect 
procedure call %s.%s.\n",
                getModuleId(sig), getFunctionId(sig));
                goto finish;
        }
@@ -599,9 +630,17 @@ CQregister(Client cntxt, MalBlkPtr mb, M
                goto finish;
        }
        q = newStmt(mb, sqlRef, transactionRef);
+       if(q == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               goto finish;
+       }
        setArgType(mb,q, 0, TYPE_void);
        moveInstruction(mb, getPC(mb,q),i);
        q = newStmt(mb, sqlRef, commitRef);
+       if(q == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               goto finish;
+       }
        setArgType(mb,q, 0, TYPE_void);
        moveInstruction(mb, getPC(mb,q),i+2);
        chkProgram(cntxt->fdout, cntxt->nspace, mb);
@@ -611,17 +650,44 @@ CQregister(Client cntxt, MalBlkPtr mb, M
 #endif
        MT_lock_set(&ttrLock);
        pnet[pnettop].mod = GDKstrdup(getModuleId(sig));
+       if(pnet[pnettop].mod == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               goto unlock;
+       }
+
        pnet[pnettop].fcn = GDKstrdup(getFunctionId(sig));
+       if(pnet[pnettop].fcn == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               GDKfree(pnet[pnettop].mod);
+               goto unlock;
+       }
+
        pnet[pnettop].stmt = instruction2str(mb,stk,sig,LIST_MAL_CALL);
+       if(pnet[pnettop].stmt == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               GDKfree(pnet[pnettop].mod);
+               GDKfree(pnet[pnettop].fcn);
+               goto unlock;
+       }
+
        pnet[pnettop].mb = mb;
+
        pnet[pnettop].stk = prepareMALstack(mb, mb->vsize);
+       if(pnet[pnettop].stk == NULL) {
+               msg = createException(SQL,"cquery.register",MAL_MALLOC_FAIL);
+               GDKfree(pnet[pnettop].mod);
+               GDKfree(pnet[pnettop].fcn);
+               GDKfree(pnet[pnettop].stmt);
+               goto unlock;
+       }
        pnet[pnettop].cycles = cycles;
-       pnet[pnettop].beats = heartbeats * 1000;
+       pnet[pnettop].beats = SET_HEARTBEATS(heartbeats);
        pnet[pnettop].run  = lng_nil;
        pnet[pnettop].seen = *timestamp_nil;
        pnet[pnettop].status = CQWAIT;
        pnettop++;
 
+unlock:
        MT_lock_unset(&ttrLock);
 finish:
        if(!msg && CQinit == 0) { /* start the scheduler if needed */
@@ -631,14 +697,12 @@ finish:
 }
 
 static str
-CQresumeInternal(Client cntxt, str modnme, str fcnnme, int with_alter)
+CQresumeInternal(Client cntxt, MalBlkPtr mb, int with_alter)
 {
        mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
-       str msg = MAL_SUCCEED;
+       str msg = MAL_SUCCEED, mb2str;
        int idx, cycles, heartbeats;
 
-       MT_lock_set(&ttrLock);
-
        if(with_alter) {
                cycles = sqlcontext ? sqlcontext->cycles : int_nil;
                heartbeats = sqlcontext ? sqlcontext->heartbeats : 1;
@@ -652,13 +716,18 @@ CQresumeInternal(Client cntxt, str modnm
                }
        }
 
-       idx = CQlocate(modnme, fcnnme);
+       MT_lock_set(&ttrLock);
+
+       if(CQlocateMb(mb, &idx, &mb2str, "cquery.resume") != MAL_SUCCEED) {
+               goto unlock;
+       }
        if( idx == pnettop) {
-               msg = createException(SQL, "cquery.resume", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
-               goto finish;
+               msg = createException(SQL, "cquery.resume", "continuous 
procedure %s has not yet started\n", mb2str);
+               GDKfree(mb2str);
+               goto unlock;
        }
        if( pnet[idx].status != CQPAUSE)
-               goto finish;
+               goto unlock;
 
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#resume scheduler\n");
@@ -666,7 +735,7 @@ CQresumeInternal(Client cntxt, str modnm
        pnet[idx].status = CQWAIT;
        if(with_alter) {
                pnet[idx].cycles = cycles;
-               pnet[idx].beats = heartbeats * 1000;
+               pnet[idx].beats = SET_HEARTBEATS(heartbeats);
        }
 
        /* start the scheduler if needed */
@@ -674,8 +743,9 @@ CQresumeInternal(Client cntxt, str modnm
                msg = CQstartScheduler();
        }
 
+unlock:
+       MT_lock_unset(&ttrLock);
 finish:
-       MT_lock_unset(&ttrLock);
        return msg;
 }
 
@@ -730,8 +800,8 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
                }
        }
        if( k >= 0 )
-               return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), 
getFunctionId(getInstrPtr(mb,k)), 1);
-       throw(SQL,"cquery.resume","Continuous query not found ");
+               return CQresumeInternal(cntxt, mb, 1);
+       throw(SQL,"cquery.resume","continuous procedure %s.%s not found", 
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
 }
 
 str
@@ -753,8 +823,8 @@ CQresumeNoAlter(Client cntxt, MalBlkPtr 
                }
        }
        if( k >= 0 )
-               return CQresumeInternal(cntxt, getModuleId(getInstrPtr(mb,k)), 
getFunctionId(getInstrPtr(mb,k)), 0);
-       throw(SQL,"cquery.resume","Continuous query not found ");
+               return CQresumeInternal(cntxt, mb, 0);
+       throw(SQL,"cquery.resume","continuous procedure %s.%s not found", 
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
 }
 
 static str
@@ -769,21 +839,24 @@ CQpauseInternalRanges(int first, int las
 }
 
 static str
-CQpauseInternal(str modnme, str fcnnme)
+CQpauseInternal(MalBlkPtr mb)
 {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to