Changeset: 2b0c567ed5c5 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2b0c567ed5c5
Added Files:
        sql/test/SQLite_regress/sqllogictest/Tests/select6.timeout
Modified Files:
        gdk/gdk.h
        gdk/gdk_utils.c
        gdk/gdk_value.c
        monetdb5/mal/mal.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_interpreter.c
        monetdb5/mal/mal_interpreter.h
        monetdb5/mal/mal_session.c
        monetdb5/mal/mal_stack.c
        monetdb5/mal/mal_stack.h
        monetdb5/modules/mal/groupby.c
        monetdb5/modules/mal/manifold.c
        monetdb5/modules/mal/orderidx.c
        monetdb5/modules/mal/remote.c
        monetdb5/modules/mal/remote.h
        monetdb5/optimizer/opt_evaluate.c
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_cast.c
        sql/server/sql_atom.c
        sql/test/SQLite_regress/sqllogictest/Tests/All
        tools/monetdbe/monetdbe.c
Branch: resource_management
Log Message:

allocator needs lock in dataflow execution


diffs (truncated from 841 to 300 lines):

diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -2539,6 +2539,8 @@ typedef struct allocator {
        size_t tmp_used; /* keeps total of tmp allocated bytes */
        bool tmp_active; /* currently only one level of temp usage */
        exception_buffer eb;
+       MT_Lock lock;    /* lock for thread-safe allocations */
+       bool locked;
 } allocator;
 
 gdk_export ValPtr VALcopy(allocator *va, ValPtr dst, const ValRecord *src)
@@ -2565,6 +2567,7 @@ gdk_export void sa_free( allocator *sa, 
 #define ma_destroy(ma)         sa_destroy(ma)
 #define ma_alloc(ma, sz)       (void*)sa_alloc(ma, sz)
 #define ma_zalloc(ma, sz)      (void*)sa_zalloc(ma, sz)
+#define ma_realloc(ma, obj, sz, osz) (void *) sa_realloc(ma, obj, sz, osz)
 #define ma_open(ma)            sa_open(ma)
 #define ma_close(ma)           sa_close(ma)
 #define ma_free(ma, obj)       sa_free(ma, obj)
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -2389,6 +2389,8 @@ create_allocator(allocator *pa)
        sa->free_blk_hits = 0;
        sa->tmp_active = 0;
        sa->tmp_used = 0;
+       MT_lock_init(&sa->lock, "allocator_lock");
+       sa->locked = false;
        return sa;
 }
 
@@ -2407,6 +2409,7 @@ void
 sa_destroy(allocator *sa)
 {
        if (sa) {
+               MT_lock_destroy(&sa->lock);
                bool root_allocator = sa->pa == NULL;
                for (size_t i = 0; i < sa->nr; i++) {
                        char *next = sa->blks[i];
diff --git a/gdk/gdk_value.c b/gdk/gdk_value.c
--- a/gdk/gdk_value.c
+++ b/gdk/gdk_value.c
@@ -158,12 +158,13 @@ VALempty(ValPtr v)
 ValPtr
 VALcopy(allocator *va, ValPtr d, const ValRecord *s)
 {
-       if (d == s && !va)
+       if (d == s) {
                return d;
-       d->bat = false;
+       }
+       *d = *s;
        d->allocated = !va;
        if (s->bat || !ATOMextern(s->vtype)) {
-               *d = *s;
+               //*d = *s;
        } else if (s->val.pval == NULL) {
                return VALinit(va, d, s->vtype, ATOMnilptr(s->vtype));
        } else if (s->vtype == TYPE_str) {
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -199,6 +199,7 @@ typedef struct MALBLK {
 typedef int (*DFhook)(void *, void *, void *, void *);
 
 typedef struct MALSTK {
+       bool allocated;
        int stksize;
        int stktop;
        int stkbot;                                     /* the first variable 
to be initialized */
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -428,7 +428,7 @@ MCcloseClient(Client c)
        c->client_pid = 0;
        c->mythread = NULL;
        if (c->glb) {
-               //freeStack(c->glb);
+               freeStack(c->glb);
                c->glb = NULL;
        }
        if (c->profticks) {
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -335,8 +335,18 @@ DFLOWworker(void *T)
                                if (ATOMIC_CAS(&flow->mb->workers, &mwrks, 
wrks))
                                        break;
                        }
-                       error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1,
+
+                       allocator *pa = flow->cntxt->alloc;
+                       MT_lock_set(&pa->lock);
+                       // TODO perhaps use tiny size allocator!
+                       allocator *ta = ma_create(pa);
+                       MT_lock_unset(&pa->lock);
+                       error = runMALsequence(ta, flow->cntxt, flow->mb, 
fe->pc, fe->pc + 1,
                                                                   flow->stk, 
0, 0);
+                       MT_lock_set(&pa->lock);
+                       ma_destroy(ta);
+                       MT_lock_unset(&pa->lock);
+
                        ATOMIC_DEC(&flow->cntxt->workers);
                        /* release the memory claim */
                        MALadmission_release(flow->cntxt, flow->mb, flow->stk, 
p, claim);
@@ -532,14 +542,14 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == 
NULL");
        if (mb == NULL)
                throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == 
NULL");
-       assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
+       assign = (int *) ma_zalloc(mb->ma, mb->vtop * sizeof(int));
        if (assign == NULL)
                throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        etop = flow->stop - flow->start;
        for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
                p = getInstrPtr(mb, pc);
                if (p == NULL) {
-                       GDKfree(assign);
+                       //GDKfree(assign);
                        throw(MAL, "dataflow",
                                  "DFLOWinitBlk(): getInstrPtr() returned 
NULL");
                }
@@ -568,25 +578,24 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                                        etop++;
                                        (void) size;
                                        if (etop == size) {
-                                               int *tmp;
+                                               //int *tmp;
                                                /* in case of realloc failure, 
the original
                                                 * pointers will be freed by 
the caller */
-                                               tmp = (int *) 
GDKrealloc(flow->nodes,
-                                                                               
                 sizeof(int) * 2 * size);
-                                               if (tmp == NULL) {
-                                                       GDKfree(assign);
+                                               size_t nsz = sizeof(int) * 2 * 
size;
+                                               flow->nodes = (int *) 
ma_realloc(mb->ma, flow->nodes, nsz, size);
+                                               if (flow->nodes == NULL) {
+                                                       // GDKfree(assign);
                                                        throw(MAL, "dataflow",
                                                                  
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                                }
-                                               flow->nodes = tmp;
-                                               tmp = (int *) 
GDKrealloc(flow->edges,
-                                                                               
                 sizeof(int) * 2 * size);
-                                               if (tmp == NULL) {
-                                                       GDKfree(assign);
+                                               //flow->nodes = tmp;
+                                               flow->edges = (int *) 
ma_realloc(mb->ma, flow->edges, nsz, size);
+                                               if (flow->edges == NULL) {
+                                                       // GDKfree(assign);
                                                        throw(MAL, "dataflow",
                                                                  
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                                }
-                                               flow->edges = tmp;
+                                               //flow->edges = tmp;
                                                size *= 2;
                                        }
                                } else {
@@ -615,25 +624,24 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                                                flow->edges[i] = etop;
                                                etop++;
                                                if (etop == size) {
-                                                       int *tmp;
+                                                       //int *tmp;
+                                                       size_t nsz = 
sizeof(int) * 2 * size;
                                                        /* in case of realloc 
failure, the original
                                                         * pointers will be 
freed by the caller */
-                                                       tmp = (int *) 
GDKrealloc(flow->nodes,
-                                                                               
                         sizeof(int) * 2 * size);
-                                                       if (tmp == NULL) {
-                                                               GDKfree(assign);
+                                                       flow->nodes = (int *) 
ma_realloc(mb->ma, flow->nodes, nsz, size);
+                                                       if (flow->nodes == 
NULL) {
+                                                               
//GDKfree(assign);
                                                                throw(MAL, 
"dataflow",
                                                                          
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                                        }
-                                                       flow->nodes = tmp;
-                                                       tmp = (int *) 
GDKrealloc(flow->edges,
-                                                                               
                         sizeof(int) * 2 * size);
-                                                       if (tmp == NULL) {
-                                                               GDKfree(assign);
+                                                       //flow->nodes = tmp;
+                                                       flow->edges = (int *) 
ma_realloc(mb->ma, flow->edges, nsz, size);
+                                                       if (flow->edges == 
NULL) {
+                                                               
//GDKfree(assign);
                                                                throw(MAL, 
"dataflow",
                                                                          
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                                        }
-                                                       flow->edges = tmp;
+                                                       //flow->edges = tmp;
                                                        size *= 2;
                                                }
                                        } else {
@@ -648,7 +656,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
                for (j = 0; j < p->retc; j++)
                        assign[getArg(p, j)] = pc;      /* ensure recognition 
of dependency on first instruction and constant */
        }
-       GDKfree(assign);
+       //GDKfree(assign);
 
        return MAL_SUCCEED;
 }
@@ -817,7 +825,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                assert(t->flag == FREE);
                assert(free_count > 0);
                free_count--;
-               free_workers = t->next;
+               free_workers = free_workers->next;
                t->next = workers;
                workers = t;
                t->flag = WAITING;
@@ -850,7 +858,8 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        }
        MT_lock_unset(&dataflowLock);
 
-       flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
+       flow = (DataFlow) ma_zalloc(mb->ma, sizeof(DataFlowRec));
+
        if (flow == NULL)
                throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
 
@@ -866,28 +875,28 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                .start = startpc + 1,
                .stop = stoppc,
                .done = q_create("flow->done"),
-               .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
+               .status = (FlowEvent) ma_zalloc(mb->ma, (stoppc - startpc + 1) *
                                                                                
sizeof(FlowEventRec)),
                .error = ATOMIC_PTR_VAR_INIT(NULL),
-               .nodes = (int *) GDKzalloc(sizeof(int) * size),
-               .edges = (int *) GDKzalloc(sizeof(int) * size),
+               .nodes = (int *) ma_zalloc(mb->ma, sizeof(int) * size),
+               .edges = (int *) ma_zalloc(mb->ma, sizeof(int) * size),
        };
 
        if (flow->done == NULL) {
-               GDKfree(flow->status);
-               GDKfree(flow->nodes);
-               GDKfree(flow->edges);
-               GDKfree(flow);
+               //GDKfree(flow->status);
+               //GDKfree(flow->nodes);
+               //GDKfree(flow->edges);
+               //GDKfree(flow);
                throw(MAL, "dataflow",
                          "runMALdataflow(): Failed to create flow->done 
queue");
        }
 
        if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL) 
{
                q_destroy(flow->done);
-               GDKfree(flow->status);
-               GDKfree(flow->nodes);
-               GDKfree(flow->edges);
-               GDKfree(flow);
+               //GDKfree(flow->status);
+               //GDKfree(flow->nodes);
+               //GDKfree(flow->edges);
+               //GDKfree(flow);
                throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
 
@@ -897,12 +906,12 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (msg == MAL_SUCCEED)
                msg = DFLOWscheduler(flow, t);
 
-       GDKfree(flow->status);
-       GDKfree(flow->edges);
-       GDKfree(flow->nodes);
+       //GDKfree(flow->status);
+       //GDKfree(flow->edges);
+       //GDKfree(flow->nodes);
        q_destroy(flow->done);
        MT_lock_destroy(&flow->flowlock);
-       GDKfree(flow);
+       //GDKfree(flow);
 
        /* we created one worker, now tell one worker to exit again */
        MT_lock_set(&todo->l);
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -275,6 +275,7 @@ malCommandCall(Client cntxt, MalStkPtr s
                                lhs->val.pval = 0;                              
        \
                                lhs->len = 0;                                   
        \
                                lhs->bat = isaBatType(getVarType(mb, i));       
        \
+                               lhs->allocated = !A; \
                        }                                                       
                        \
                }                                                               
                        \
        } while (0)
@@ -289,19 +290,19 @@ isNotUsedIn(InstrPtr p, int start, int a
 }
 
 MalStkPtr
-prepareMALstack(MalBlkPtr mb, int size)
+prepareMALstack(allocator *pa, MalBlkPtr mb, int size)
 {
        MalStkPtr stk = NULL;
        int res = 1;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to