Changeset: 6739ad5dff63 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6739ad5dff63
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Oct2014
Log Message:

Remove superfluous test, and properly lock concurrent access.


diffs (80 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -343,47 +343,47 @@ DFLOWworker(void *T)
                assert(flow);
 
                /* whenever we have a (concurrent) error, skip it */
+               MT_lock_set(&flow->flowlock, "DFLOWworker");
                if (flow->error) {
+                       MT_lock_unset(&flow->flowlock, "DFLOWworker");
                        q_enqueue(flow->done, fe);
                        continue;
                }
+               MT_lock_unset(&flow->flowlock, "DFLOWworker");
 
-               /* skip all instructions when we have encontered an error */
-               if (flow->error == 0) {
 #ifdef USE_MAL_ADMISSION
-                       if (MALadmission(fe->argclaim, fe->hotclaim)) {
-                               fe->hotclaim = 0;   /* don't assume priority 
anymore */
-                               if (todo->last == 0)
-                                       MT_sleep_ms(DELAYUNIT);
-                               q_requeue(todo, fe);
-                               continue;
-                       }
+               if (MALadmission(fe->argclaim, fe->hotclaim)) {
+                       fe->hotclaim = 0;   /* don't assume priority anymore */
+                       if (todo->last == 0)
+                               MT_sleep_ms(DELAYUNIT);
+                       q_requeue(todo, fe);
+                       continue;
+               }
 #endif
-                       error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
-                       PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d 
claim= " LLFMT "," LLFMT " %s\n",
-                                                        fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
+               error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
+               PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " 
LLFMT "," LLFMT " %s\n",
+                                                fe->pc, id, fe->argclaim, 
fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
-                       /* release the memory claim */
-                       MALadmission(-fe->argclaim, -fe->hotclaim);
+               /* release the memory claim */
+               MALadmission(-fe->argclaim, -fe->hotclaim);
 #endif
-                       /* update the numa information. keep the thread-id 
producing the value */
-                       p= getInstrPtr(flow->mb,fe->pc);
-                       for( i = 0; i < p->argc; i++)
-                               flow->mb->var[getArg(p,i)]->worker = thr->tid;
+               /* update the numa information. keep the thread-id producing 
the value */
+               p= getInstrPtr(flow->mb,fe->pc);
+               for( i = 0; i < p->argc; i++)
+                       flow->mb->var[getArg(p,i)]->worker = thr->tid;
 
+               MT_lock_set(&flow->flowlock, "DFLOWworker");
+               fe->state = DFLOWwrapup;
+               MT_lock_unset(&flow->flowlock, "DFLOWworker");
+               if (error) {
                        MT_lock_set(&flow->flowlock, "DFLOWworker");
-                       fe->state = DFLOWwrapup;
+                       /* only collect one error (from one thread, needed for 
stable testing) */
+                       if (!flow->error)
+                               flow->error = error;
                        MT_lock_unset(&flow->flowlock, "DFLOWworker");
-                       if (error) {
-                               MT_lock_set(&flow->flowlock, "DFLOWworker");
-                               /* only collect one error (from one thread, 
needed for stable testing) */
-                               if (!flow->error)
-                                       flow->error = error;
-                               MT_lock_unset(&flow->flowlock, "DFLOWworker");
-                               /* after an error we skip the rest of the block 
*/
-                               q_enqueue(flow->done, fe);
-                               continue;
-                       }
+                       /* after an error we skip the rest of the block */
+                       q_enqueue(flow->done, fe);
+                       continue;
                }
 
                /* see if you can find an eligible instruction that uses the
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to