Changeset: 1bd3bc7e199b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1bd3bc7e199b
Modified Files:
sql/backends/monet5/datacell/petrinet.c
Branch: default
Log Message:
Fix multi-threaded execution.
The continuous queries are now properly included in the control function.
diffs (160 lines):
diff --git a/sql/backends/monet5/datacell/petrinet.c
b/sql/backends/monet5/datacell/petrinet.c
--- a/sql/backends/monet5/datacell/petrinet.c
+++ b/sql/backends/monet5/datacell/petrinet.c
@@ -78,12 +78,10 @@
#define PNcontrolInfinit 1 /* infinit loop of PNController */
#define PNcontrolEnd 2 /* when all factories are disable PNController
exits */
-/* #define _DEBUG_PETRINET_ */
+#define _DEBUG_PETRINET_
/*static int controlRounds = PNcontrolInfinit;*/
-static MT_Lock petriLock;
-
typedef struct {
char *table;
int bskt; /* basket used */
@@ -193,7 +191,7 @@ str PNregister(Client cntxt, MalBlkPtr m
pnettop++;
msg = PNanalysis(cntxt, s->def);
/* start the scheduler if analysis does not show errors */
- if ( msg == MAL_SUCCEED && status == BSKTINIT)
+ if ( msg == MAL_SUCCEED )
return PNstartThread(ret);
return msg;
}
@@ -286,9 +284,9 @@ str PNstopScheduler(int *ret)
{
int i = 0, j = pnettop;
pnettop = 0; /* don't look at it anymore */
- MT_lock_set(&petriLock, "pncontroller");
+ MT_lock_set(&dcLock, "pncontroller");
status = BSKTSTOP;
- MT_lock_unset(&petriLock, "pncontroller");
+ MT_lock_unset(&dcLock, "pncontroller");
i = 0;
do {
MT_sleep_ms(cycleDelay + 1); /* delay to make it more
tractable */
@@ -309,7 +307,9 @@ str PNresumeScheduler(int *ret)
for( i =0; i< pnettop; i++)
pnet[i].status = BSKTRUNNING;
+ MT_lock_set(&dcLock, "pncontroller");
status = BSKTRUNNING;
+ MT_lock_unset(&dcLock, "pncontroller");
(void) ret;
return MAL_SUCCEED;
}
@@ -320,7 +320,9 @@ str PNpauseScheduler(int *ret)
for( i =0; i< pnettop; i++)
pnet[i].status = BSKTPAUSE;
+ MT_lock_set(&dcLock, "pncontroller");
status = BSKTPAUSE;
+ MT_lock_unset(&dcLock, "pncontroller");
(void) ret;
return MAL_SUCCEED;
}
@@ -467,6 +469,7 @@ PNcontroller(void *dummy)
int m = 0;
str msg;
lng t, analysis, now;
+ char buf[BUFSIZ], *modnme, *fcnnme;
cntxt = mal_clients; /* run as admin */
SQLinitEnvironment(cntxt);
@@ -479,17 +482,17 @@ PNcontroller(void *dummy)
return;
}
- /* create the lock */
- MT_lock_init(&petriLock, "petrinet");
-
/* create a fake procedure to highlight the continuous queries */
- s = newFunction("user", "pnController", FACTORYsymbol);
+ s = newFunction(userRef, "pnController", FUNCTIONsymbol);
+ mb= s->def;
p = getSignature(s);
- getArg(p, 0) = newTmpVariable(mb = s->def, TYPE_void);
+ getArg(p, 0) = newTmpVariable(mb, TYPE_void);
+reinit:
+ MT_lock_set(&dcLock, "pncontroller");
+ status = BSKTRUNNING;
+ mb->stop = 1;
/* create an execution environment for all transitions */
for (i = 0; i < pnettop; i++) {
- char buf[BUFSIZ], *modnme, *fcnnme;
-
BSKTelements(pnet[i].name, buf, &modnme, &fcnnme);
BSKTtolower(modnme);
BSKTtolower(fcnnme);
@@ -503,6 +506,7 @@ PNcontroller(void *dummy)
mnstr_printf(cntxt->fdout, "#Petrinet Controller found
errors\n");
return;
}
+ MT_lock_unset(&dcLock, "pncontroller");
newStack(glb, mb->vtop);
memset((char *) glb, 0, stackSize(mb->vtop));
glb->stktop = mb->vtop;
@@ -510,19 +514,17 @@ PNcontroller(void *dummy)
#ifdef _DEBUG_PETRINET_
printFunction(cntxt->fdout, mb, 0, LIST_MAL_ALL);
#endif
- do {
+ while( status != BSKTSTOP){
if (cycleDelay)
MT_sleep_ms(cycleDelay); /* delay to make it more
tractable */
while (status == BSKTPAUSE)
;
- MT_lock_set(&petriLock, "pncontroller");
- if (status != BSKTSTOP)
- /* collect latest statistics, note that we don't need a
lock here,
- because the count need not be accurate to the usec.
It will simply
- come back. We also only have to check the sources
that are marked
- empty. */
- status = BSKTRUNNING;
- MT_lock_unset(&petriLock, "pncontroller");
+ if ( mb->stop < pnettop + 2)
+ goto reinit; /* new query arrived */
+ /* collect latest statistics, note that we don't need a lock
here,
+ because the count need not be accurate to the usec. It will
simply
+ come back. We also only have to check the sources that are
marked
+ empty. */
now = GDKusec();
for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++)
if ( pnet[i].status != BSKTPAUSE ){
@@ -608,8 +610,10 @@ PNcontroller(void *dummy)
}
}
}
- } while (status != BSKTSTOP);
+ }
+ MT_lock_set(&dcLock, "pncontroller");
status = BSKTINIT;
+ MT_lock_unset(&dcLock, "pncontroller");
(void) dummy;
}
@@ -621,7 +625,7 @@ str PNstartThread(int *ret)
PNdump(&s);
#endif
- if (MT_create_thread(&pid, PNcontroller, &s, MT_THR_DETACHED) != 0)
+ if (status== BSKTINIT && MT_create_thread(&pid, PNcontroller, &s,
MT_THR_DETACHED) != 0)
throw(MAL, "petrinet.startThread", "Process creation failed");
(void) ret;
@@ -635,10 +639,10 @@ static str PNstart(int *ret)
#ifdef _DEBUG_PETRINET_
PNdump(&s);
#endif
- MT_lock_set(&petriLock, "pncontroller");
+ MT_lock_set(&dcLock, "pncontroller");
if (status != BSKTSTOP)
status = BSKTRUNNING;
- MT_lock_unset(&petriLock, "pncontroller");
+ MT_lock_unset(&dcLock, "pncontroller");
/*controlRounds = PNcontrolEnd;*/
PNcontroller(&s);
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list