Changeset: 2b0c567ed5c5 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2b0c567ed5c5
Added Files:
sql/test/SQLite_regress/sqllogictest/Tests/select6.timeout
Modified Files:
gdk/gdk.h
gdk/gdk_utils.c
gdk/gdk_value.c
monetdb5/mal/mal.h
monetdb5/mal/mal_client.c
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_interpreter.c
monetdb5/mal/mal_interpreter.h
monetdb5/mal/mal_session.c
monetdb5/mal/mal_stack.c
monetdb5/mal/mal_stack.h
monetdb5/modules/mal/groupby.c
monetdb5/modules/mal/manifold.c
monetdb5/modules/mal/orderidx.c
monetdb5/modules/mal/remote.c
monetdb5/modules/mal/remote.h
monetdb5/optimizer/opt_evaluate.c
sql/backends/monet5/sql.c
sql/backends/monet5/sql_cast.c
sql/server/sql_atom.c
sql/test/SQLite_regress/sqllogictest/Tests/All
tools/monetdbe/monetdbe.c
Branch: resource_management
Log Message:
allocator needs lock in dataflow execution
diffs (truncated from 841 to 300 lines):
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -2539,6 +2539,8 @@ typedef struct allocator {
size_t tmp_used; /* keeps total of tmp allocated bytes */
bool tmp_active; /* currently only one level of temp usage */
exception_buffer eb;
+ MT_Lock lock; /* lock for thread-safe allocations */
+ bool locked;
} allocator;
gdk_export ValPtr VALcopy(allocator *va, ValPtr dst, const ValRecord *src)
@@ -2565,6 +2567,7 @@ gdk_export void sa_free( allocator *sa,
#define ma_destroy(ma) sa_destroy(ma)
#define ma_alloc(ma, sz) (void*)sa_alloc(ma, sz)
#define ma_zalloc(ma, sz) (void*)sa_zalloc(ma, sz)
+#define ma_realloc(ma, obj, sz, osz) (void *) sa_realloc(ma, obj, sz, osz)
#define ma_open(ma) sa_open(ma)
#define ma_close(ma) sa_close(ma)
#define ma_free(ma, obj) sa_free(ma, obj)
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -2389,6 +2389,8 @@ create_allocator(allocator *pa)
sa->free_blk_hits = 0;
sa->tmp_active = 0;
sa->tmp_used = 0;
+ MT_lock_init(&sa->lock, "allocator_lock");
+ sa->locked = false;
return sa;
}
@@ -2407,6 +2409,7 @@ void
sa_destroy(allocator *sa)
{
if (sa) {
+ MT_lock_destroy(&sa->lock);
bool root_allocator = sa->pa == NULL;
for (size_t i = 0; i < sa->nr; i++) {
char *next = sa->blks[i];
diff --git a/gdk/gdk_value.c b/gdk/gdk_value.c
--- a/gdk/gdk_value.c
+++ b/gdk/gdk_value.c
@@ -158,12 +158,13 @@ VALempty(ValPtr v)
ValPtr
VALcopy(allocator *va, ValPtr d, const ValRecord *s)
{
- if (d == s && !va)
+ if (d == s) {
return d;
- d->bat = false;
+ }
+ *d = *s;
d->allocated = !va;
if (s->bat || !ATOMextern(s->vtype)) {
- *d = *s;
+ //*d = *s;
} else if (s->val.pval == NULL) {
return VALinit(va, d, s->vtype, ATOMnilptr(s->vtype));
} else if (s->vtype == TYPE_str) {
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -199,6 +199,7 @@ typedef struct MALBLK {
typedef int (*DFhook)(void *, void *, void *, void *);
typedef struct MALSTK {
+ bool allocated;
int stksize;
int stktop;
int stkbot; /* the first variable
to be initialized */
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -428,7 +428,7 @@ MCcloseClient(Client c)
c->client_pid = 0;
c->mythread = NULL;
if (c->glb) {
- //freeStack(c->glb);
+ freeStack(c->glb);
c->glb = NULL;
}
if (c->profticks) {
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -335,8 +335,18 @@ DFLOWworker(void *T)
if (ATOMIC_CAS(&flow->mb->workers, &mwrks,
wrks))
break;
}
- error = runMALsequence(flow->cntxt, flow->mb, fe->pc,
fe->pc + 1,
+
+ allocator *pa = flow->cntxt->alloc;
+ MT_lock_set(&pa->lock);
+ // TODO perhaps use tiny size allocator!
+ allocator *ta = ma_create(pa);
+ MT_lock_unset(&pa->lock);
+ error = runMALsequence(ta, flow->cntxt, flow->mb,
fe->pc, fe->pc + 1,
flow->stk,
0, 0);
+ MT_lock_set(&pa->lock);
+ ma_destroy(ta);
+ MT_lock_unset(&pa->lock);
+
ATOMIC_DEC(&flow->cntxt->workers);
/* release the memory claim */
MALadmission_release(flow->cntxt, flow->mb, flow->stk,
p, claim);
@@ -532,14 +542,14 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow ==
NULL");
if (mb == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb ==
NULL");
- assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
+ assign = (int *) ma_zalloc(mb->ma, mb->vtop * sizeof(int));
if (assign == NULL)
throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
etop = flow->stop - flow->start;
for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
p = getInstrPtr(mb, pc);
if (p == NULL) {
- GDKfree(assign);
+ //GDKfree(assign);
throw(MAL, "dataflow",
"DFLOWinitBlk(): getInstrPtr() returned
NULL");
}
@@ -568,25 +578,24 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
etop++;
(void) size;
if (etop == size) {
- int *tmp;
+ //int *tmp;
/* in case of realloc failure,
the original
* pointers will be freed by
the caller */
- tmp = (int *)
GDKrealloc(flow->nodes,
-
sizeof(int) * 2 * size);
- if (tmp == NULL) {
- GDKfree(assign);
+ size_t nsz = sizeof(int) * 2 *
size;
+ flow->nodes = (int *)
ma_realloc(mb->ma, flow->nodes, nsz, size);
+ if (flow->nodes == NULL) {
+ // GDKfree(assign);
throw(MAL, "dataflow",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
- flow->nodes = tmp;
- tmp = (int *)
GDKrealloc(flow->edges,
-
sizeof(int) * 2 * size);
- if (tmp == NULL) {
- GDKfree(assign);
+ //flow->nodes = tmp;
+ flow->edges = (int *)
ma_realloc(mb->ma, flow->edges, nsz, size);
+ if (flow->edges == NULL) {
+ // GDKfree(assign);
throw(MAL, "dataflow",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
- flow->edges = tmp;
+ //flow->edges = tmp;
size *= 2;
}
} else {
@@ -615,25 +624,24 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
flow->edges[i] = etop;
etop++;
if (etop == size) {
- int *tmp;
+ //int *tmp;
+ size_t nsz =
sizeof(int) * 2 * size;
/* in case of realloc
failure, the original
* pointers will be
freed by the caller */
- tmp = (int *)
GDKrealloc(flow->nodes,
-
sizeof(int) * 2 * size);
- if (tmp == NULL) {
- GDKfree(assign);
+ flow->nodes = (int *)
ma_realloc(mb->ma, flow->nodes, nsz, size);
+ if (flow->nodes ==
NULL) {
+
//GDKfree(assign);
throw(MAL,
"dataflow",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
- flow->nodes = tmp;
- tmp = (int *)
GDKrealloc(flow->edges,
-
sizeof(int) * 2 * size);
- if (tmp == NULL) {
- GDKfree(assign);
+ //flow->nodes = tmp;
+ flow->edges = (int *)
ma_realloc(mb->ma, flow->edges, nsz, size);
+ if (flow->edges ==
NULL) {
+
//GDKfree(assign);
throw(MAL,
"dataflow",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
- flow->edges = tmp;
+ //flow->edges = tmp;
size *= 2;
}
} else {
@@ -648,7 +656,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
for (j = 0; j < p->retc; j++)
assign[getArg(p, j)] = pc; /* ensure recognition
of dependency on first instruction and constant */
}
- GDKfree(assign);
+ //GDKfree(assign);
return MAL_SUCCEED;
}
@@ -817,7 +825,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
assert(t->flag == FREE);
assert(free_count > 0);
free_count--;
- free_workers = t->next;
+ free_workers = free_workers->next;
t->next = workers;
workers = t;
t->flag = WAITING;
@@ -850,7 +858,8 @@ runMALdataflow(Client cntxt, MalBlkPtr m
}
MT_lock_unset(&dataflowLock);
- flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
+ flow = (DataFlow) ma_zalloc(mb->ma, sizeof(DataFlowRec));
+
if (flow == NULL)
throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
@@ -866,28 +875,28 @@ runMALdataflow(Client cntxt, MalBlkPtr m
.start = startpc + 1,
.stop = stoppc,
.done = q_create("flow->done"),
- .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
+ .status = (FlowEvent) ma_zalloc(mb->ma, (stoppc - startpc + 1) *
sizeof(FlowEventRec)),
.error = ATOMIC_PTR_VAR_INIT(NULL),
- .nodes = (int *) GDKzalloc(sizeof(int) * size),
- .edges = (int *) GDKzalloc(sizeof(int) * size),
+ .nodes = (int *) ma_zalloc(mb->ma, sizeof(int) * size),
+ .edges = (int *) ma_zalloc(mb->ma, sizeof(int) * size),
};
if (flow->done == NULL) {
- GDKfree(flow->status);
- GDKfree(flow->nodes);
- GDKfree(flow->edges);
- GDKfree(flow);
+ //GDKfree(flow->status);
+ //GDKfree(flow->nodes);
+ //GDKfree(flow->edges);
+ //GDKfree(flow);
throw(MAL, "dataflow",
"runMALdataflow(): Failed to create flow->done
queue");
}
if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL)
{
q_destroy(flow->done);
- GDKfree(flow->status);
- GDKfree(flow->nodes);
- GDKfree(flow->edges);
- GDKfree(flow);
+ //GDKfree(flow->status);
+ //GDKfree(flow->nodes);
+ //GDKfree(flow->edges);
+ //GDKfree(flow);
throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
@@ -897,12 +906,12 @@ runMALdataflow(Client cntxt, MalBlkPtr m
if (msg == MAL_SUCCEED)
msg = DFLOWscheduler(flow, t);
- GDKfree(flow->status);
- GDKfree(flow->edges);
- GDKfree(flow->nodes);
+ //GDKfree(flow->status);
+ //GDKfree(flow->edges);
+ //GDKfree(flow->nodes);
q_destroy(flow->done);
MT_lock_destroy(&flow->flowlock);
- GDKfree(flow);
+ //GDKfree(flow);
/* we created one worker, now tell one worker to exit again */
MT_lock_set(&todo->l);
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -275,6 +275,7 @@ malCommandCall(Client cntxt, MalStkPtr s
lhs->val.pval = 0;
\
lhs->len = 0;
\
lhs->bat = isaBatType(getVarType(mb, i));
\
+ lhs->allocated = !A; \
}
\
}
\
} while (0)
@@ -289,19 +290,19 @@ isNotUsedIn(InstrPtr p, int start, int a
}
MalStkPtr
-prepareMALstack(MalBlkPtr mb, int size)
+prepareMALstack(allocator *pa, MalBlkPtr mb, int size)
{
MalStkPtr stk = NULL;
int res = 1;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]