Changeset: e3851fde20a3 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e3851fde20a3
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: SciQL-2
Log Message:
Merge with Feb2013 branch.
diffs (154 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -110,26 +110,29 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
static Queue*
q_create(int sz)
{
+ const char* name = "q_create";
Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL)
return NULL;
q->size = ((sz << 1) >> 1); /* we want a multiple of 2 */
q->last = 0;
- q->data = (void*)GDKmalloc(sizeof(FlowEvent) * q->size);
+ q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
if (q->data == NULL) {
GDKfree(q);
return NULL;
}
- MT_lock_init(&q->l, "q_create");
- MT_sema_init(&q->s, 0, "q_create");
+ (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
+ MT_lock_init(&q->l, name);
+ MT_sema_init(&q->s, 0, name);
return q;
}
static void
q_destroy(Queue *q)
{
+ assert(q);
MT_lock_destroy(&q->l);
MT_sema_destroy(&q->s);
GDKfree(q->data);
@@ -141,16 +144,20 @@ q_destroy(Queue *q)
static void
q_enqueue_(Queue *q, FlowEvent d)
{
+ assert(q);
assert(d);
if (q->last == q->size) {
q->size <<= 1;
- q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
+ q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) *
q->size);
+ assert(q->data);
}
q->data[q->last++] = d;
}
static void
q_enqueue(Queue *q, FlowEvent d)
{
+ assert(q);
+ assert(d);
MT_lock_set(&q->l, "q_enqueue");
q_enqueue_(q, d);
MT_lock_unset(&q->l, "q_enqueue");
@@ -169,20 +176,23 @@ q_requeue_(Queue *q, FlowEvent d)
{
int i;
+ assert(q);
assert(d);
if (q->last == q->size) {
/* enlarge buffer */
q->size <<= 1;
- q->data = GDKrealloc(q->data, sizeof(void*) * q->size);
+ q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) *
q->size);
+ assert(q->data);
}
for (i = q->last; i > 0; i--)
q->data[i] = q->data[i - 1];
- q->data[0] = (void*)d;
+ q->data[0] = d;
q->last++;
}
static void
q_requeue(Queue *q, FlowEvent d)
{
+ assert(q);
assert(d);
MT_lock_set(&q->l, "q_requeue");
q_requeue_(q, d);
@@ -196,12 +206,13 @@ q_dequeue(Queue *q)
{
void *r = NULL;
+ assert(q);
MT_sema_down(&q->s, "q_dequeue");
MT_lock_set(&q->l, "q_dequeue");
- assert(q->last);
+ assert(q->last > 0);
if (q->last > 0) {
/* LIFO favors garbage collection */
- r = q->data[--q->last];
+ r = (void*) q->data[--q->last];
q->data[q->last] = 0;
}
/* else: terminating */
@@ -260,10 +271,12 @@ DFLOWworker(void *t)
wq = workerqueue[id];
if (fnxt == 0)
fe = q_dequeue(todo[wq]);
- else fe = fnxt;
+ else
+ fe = fnxt;
fnxt = 0;
assert(fe);
flow = fe->flow;
+ assert(flow);
/* whenever we have a (concurrent) error, skip it */
if (flow->error) {
@@ -277,6 +290,7 @@ DFLOWworker(void *t)
#ifdef USE_MAL_ADMISSION
if (MALadmission(fe->argclaim, fe->hotclaim)) {
fe->hotclaim = 0; /* don't assume priority
anymore */
+ assert(todo[wq]);
if (todo[wq]->last == 0)
MT_sleep_ms(DELAYUNIT);
q_requeue(todo[wq], fe);
@@ -314,6 +328,7 @@ DFLOWworker(void *t)
#ifdef USE_MAL_ADMISSION
{
InstrPtr p = getInstrPtr(flow->mb, fe->pc);
+ assert(p);
fe->hotclaim = 0;
for (i = 0; i < p->retc; i++)
fe->hotclaim += getMemoryClaim(flow->mb, flow->stk,
fe->pc, i, FALSE);
@@ -335,6 +350,7 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe);
if ( fnxt == 0) {
+ assert(todo[wq]);
if (todo[wq]->last == 0)
profilerHeartbeatEvent("wait");
else
@@ -513,11 +529,14 @@ DFLOWscheduler(DataFlow flow)
int j;
InstrPtr p;
#endif
- int tasks=0, actions = flow->stop - flow->start;
+ int tasks=0, actions;
str ret = MAL_SUCCEED;
FlowEvent fe, f = 0;
int wq;
+ if (flow == NULL)
+ throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow ==
NULL");
+ actions = flow->stop - flow->start;
if (actions == 0)
throw(MAL, "dataflow", "Empty dataflow block");
/* initialize the eligible statements */
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list