Changeset: 7eeaf552e950 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7eeaf552e950
Modified Files:
sql/backends/monet5/sql_cquery.c
sql/server/sql_parser.y
Branch: trails
Log Message:
Fixed SQL grammar for start and resume continuous queries. Fixed the internal
behavior of STOP ALL and PAUSE ALL
diffs (truncated from 488 to 300 lines):
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -286,20 +286,19 @@ CQlocateMb(MalBlkPtr mb, int* idx, str*
break;
}
assert(i != mb->stop);
- mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL);
- if(mb2str == NULL) {
- throw(MAL,call,MAL_MALLOC_FAIL);
+ if((mb2str = instruction2str(mb, NULL, sig, LIST_MAL_CALL)) == NULL) {
+ throw(SQL,call,MAL_MALLOC_FAIL);
}
for (i = 0; i < pnettop; i++){
if (strcmp(pnet[i].stmt, mb2str) == 0) {
- GDKfree(mb2str);
*idx = i;
+ *res = mb2str;
return MAL_SUCCEED;
}
}
+ *idx = i;
*res = mb2str;
- *idx = i;
return MAL_SUCCEED;
}
@@ -317,7 +316,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
idx = CQlocate(sch, fcn);
if( idx == pnettop)
- throw(SQL,"cquery.error","Continuous procedure %s.%s not
accessible",sch,fcn);
+ throw(SQL,"cquery.error","The continuous procedure %s.%s is not
accessible\n",sch,fcn);
pnet[idx].error = GDKstrdup(error);
return MAL_SUCCEED;
@@ -336,7 +335,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
idx = CQlocate(sch, fcn);
if( idx == pnettop)
- throw(SQL,"cquery.show","continuous procedure %s.%s not
accessible",sch,fcn);
+ throw(SQL,"cquery.show","The continuous procedure %s.%s is not
accessible\n",sch,fcn);
printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME |
LIST_MAL_VALUE | LIST_MAL_MAPI);
return MAL_SUCCEED;
@@ -535,7 +534,7 @@ CQprocedure(Client cntxt, MalBlkPtr mb,
return msg;
s = findSymbolInModule(cntxt->nspace, putName(nme));
if (s == NULL)
- throw(SQL, "cqeury.procedure", "Definition of procedure
missing");
+ throw(SQL, "cquery.procedure", "Definition of procedure
missing");
qry = s->def;
chkProgram(cntxt->fdout,cntxt->nspace,qry);
@@ -596,12 +595,12 @@ CQregister(Client cntxt, MalBlkPtr mb, M
(void) pci;
- if(cycles <= 0 && cycles != int_nil){
- msg = createException(SQL,"cquery.register","The cycles value
must be positive");
+ if(cycles < 0 && cycles != int_nil){
+ msg = createException(SQL,"cquery.register","The cycles value
must be non negative\n");
goto finish;
}
- if(heartbeats <= 0){
- msg = createException(SQL,"cquery.register","The heartbeats
value must be positive");
+ if(heartbeats < 0){
+ msg = createException(SQL,"cquery.register","The heartbeats
value must be non negative\n");
goto finish;
}
@@ -700,18 +699,22 @@ static str
CQresumeInternal(Client cntxt, MalBlkPtr mb, int with_alter)
{
mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
- str msg = MAL_SUCCEED, mb2str;
+ str msg = MAL_SUCCEED, mb2str = NULL;
int idx, cycles, heartbeats;
+#ifdef DEBUG_CQUERY
+ fprintf(stderr, "#resume scheduler\n");
+#endif
+
if(with_alter) {
cycles = sqlcontext ? sqlcontext->cycles : int_nil;
heartbeats = sqlcontext ? sqlcontext->heartbeats : 1;
- if(cycles <= 0 && cycles != int_nil){
- msg = createException(SQL,"cquery.resume","The cycles
value must be positive");
+ if(cycles < 0 && cycles != int_nil){
+ msg = createException(SQL,"cquery.resume","The cycles
value must be non negative\n");
goto finish;
}
- if(heartbeats <= 0){
- msg = createException(SQL,"cquery.resume","The
heartbeats value must be positive");
+ if(heartbeats < 0){
+ msg = createException(SQL,"cquery.resume","The
heartbeats value must be non negative\n");
goto finish;
}
}
@@ -722,16 +725,14 @@ CQresumeInternal(Client cntxt, MalBlkPtr
goto unlock;
}
if( idx == pnettop) {
- msg = createException(SQL, "cquery.resume", "continuous
procedure %s has not yet started\n", mb2str);
- GDKfree(mb2str);
+ msg = createException(SQL, "cquery.resume", "The continuous
procedure %s has not yet started\n", mb2str);
goto unlock;
}
- if( pnet[idx].status != CQPAUSE)
+ if( pnet[idx].status != CQPAUSE) {
+ msg = createException(SQL, "cquery.resume", "The continuous
procedure %s is already running\n", mb2str);
goto unlock;
+ }
-#ifdef DEBUG_CQUERY
- fprintf(stderr, "#resume scheduler\n");
-#endif
pnet[idx].status = CQWAIT;
if(with_alter) {
pnet[idx].cycles = cycles;
@@ -744,43 +745,13 @@ CQresumeInternal(Client cntxt, MalBlkPtr
}
unlock:
+ if(mb2str)
+ GDKfree(mb2str);
MT_lock_unset(&ttrLock);
finish:
return msg;
}
-static str
-CQresumeInternalRanges(int first, int last)
-{
- str msg = MAL_SUCCEED;
-
-#ifdef DEBUG_CQUERY
- fprintf(stderr, "#resume scheduler\n");
-#endif
- for( ; first < last; first++)
- pnet[first].status = CQWAIT;
-
- /* start the scheduler if needed */
- if(CQinit == 0) {
- msg = CQstartScheduler();
- }
- return msg;
-}
-
-str
-CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- str msg;
- (void) cntxt;
- (void) mb;
- (void) stk;
- (void) pci;
- MT_lock_set(&ttrLock);
- msg = CQresumeInternalRanges(0, pnettop);
- MT_lock_unset(&ttrLock);
- return msg;
-}
-
str
CQresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
@@ -801,7 +772,7 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
}
if( k >= 0 )
return CQresumeInternal(cntxt, mb, 1);
- throw(SQL,"cquery.resume","continuous procedure %s.%s not found",
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+ throw(SQL,"cquery.resume","The continuous procedure %s.%s was not
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
}
str
@@ -824,33 +795,52 @@ CQresumeNoAlter(Client cntxt, MalBlkPtr
}
if( k >= 0 )
return CQresumeInternal(cntxt, mb, 0);
- throw(SQL,"cquery.resume","continuous procedure %s.%s not found",
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+ throw(SQL,"cquery.resume","The continuous procedure %s.%s was not
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
}
-static str
-CQpauseInternalRanges(int first, int last)
+str
+CQresumeAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
+ str msg = MAL_SUCCEED;
+ int i;
+
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#pause cqueries\n");
+ fprintf(stderr, "#resume scheduler\n");
#endif
- for( ; first < last; first++)
- pnet[first].status = CQPAUSE;
- return MAL_SUCCEED;
+
+ MT_lock_set(&ttrLock);
+ for(i = 0 ; i < pnettop; i++)
+ pnet[i].status = CQWAIT;
+
+ /* start the scheduler if needed */
+ if(CQinit == 0) {
+ msg = CQstartScheduler();
+ }
+ MT_lock_unset(&ttrLock);
+ return msg;
}
static str
CQpauseInternal(MalBlkPtr mb)
{
int idx;
- str msg = MAL_SUCCEED, mb2str;
+ str msg = MAL_SUCCEED, mb2str = NULL;
MT_lock_set(&ttrLock);
if(CQlocateMb(mb, &idx, &mb2str, "cquery.pause") != MAL_SUCCEED) {
goto finish;
}
if( idx == pnettop) {
- msg = createException(SQL, "cquery.pause", "continuous
procedure %s has not yet started\n", mb2str);
- GDKfree(mb2str);
+ msg = createException(SQL, "cquery.pause", "The continuous
procedure %s has not yet started\n", mb2str);
+ goto finish;
+ }
+ if( pnet[idx].status == CQPAUSE) {
+ msg = createException(SQL, "cquery.pause", "The continuous
procedure %s is already paused\n", mb2str);
goto finish;
}
// actually wait if the query was running
@@ -861,23 +851,11 @@ CQpauseInternal(MalBlkPtr mb)
if( pnet[idx].status == CQWAIT)
break;
}
- msg = CQpauseInternalRanges(idx, idx+1);
-finish:
- MT_lock_unset(&ttrLock);
- return msg;
-}
+ pnet[idx].status = CQPAUSE;
-str
-CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- str msg;
- (void) cntxt;
- (void) mb;
- (void) stk;
- (void) pci;
- //pause all
- MT_lock_set(&ttrLock);
- msg = CQpauseInternalRanges(0, pnettop);
+finish:
+ if(mb2str)
+ GDKfree(mb2str);
MT_lock_unset(&ttrLock);
return msg;
}
@@ -902,7 +880,37 @@ CQpause(Client cntxt, MalBlkPtr mb, MalS
}
if( k >= 0)
return CQpauseInternal(mb);
- throw(SQL,"cquery.pause","continuous procedure %s.%s not found",
getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+ throw(SQL,"cquery.pause","The continuous procedure %s.%s was not
found\n", getModuleId(getInstrPtr(mb,k)), getFunctionId(getInstrPtr(mb,k)));
+}
+
+str
+CQpauseAll(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str msg = MAL_SUCCEED;
+ int i;
+
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+
+#ifdef DEBUG_CQUERY
+ fprintf(stderr, "#pause cqueries\n");
+#endif
+
+ MT_lock_set(&ttrLock);
+ for(i = 0 ; i < pnettop; i++) {
+ while( pnet[i].status == CQRUNNING ){
+ MT_lock_unset(&ttrLock);
+ MT_sleep_ms(5);
+ MT_lock_set(&ttrLock);
+ if( pnet[i].status == CQWAIT)
+ break;
+ }
+ pnet[i].status = CQPAUSE;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list