Changeset: 01c82754c653 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/01c82754c653
Modified Files:
monetdb5/mal/mal_exception.c
monetdb5/modules/mal/tablet.c
Branch: resource_management
Log Message:
use tls allocator more rigorously in tablet code
diffs (truncated from 335 to 300 lines):
diff --git a/monetdb5/mal/mal_exception.c b/monetdb5/mal/mal_exception.c
--- a/monetdb5/mal/mal_exception.c
+++ b/monetdb5/mal/mal_exception.c
@@ -372,6 +372,6 @@ inline str
copyException(allocator *ma, const char *exception)
{
if (exception)
- return MA_STRDUP(ma, exception);
+ return ma ? MA_STRDUP(ma, exception) : (char *)exception;
return NULL;
}
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -638,11 +638,13 @@ tablet_read_more(READERtask *task)
/* note, the column value that is passed here is the 0 based value; the
* lineno value on the other hand is 1 based */
static void
-tablet_error(allocator *ma, READERtask *task, lng idx, lng lineno, int col,
const char *msg,
+tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
const char *fcn)
{
assert(is_int_nil(col) || col >= 0);
assert(is_lng_nil(lineno) || lineno >= 1);
+ allocator *ma = task->cntxt->curprg->def->ma;
+ assert(ma);
MT_lock_set(&errorlock);
if (task->cntxt->error_row != NULL
&& (BUNappend(task->cntxt->error_row, &lineno, false) !=
GDK_SUCCEED
@@ -823,7 +825,7 @@ SQLload_error(allocator *ma, READERtask
sz += task->rseplen + 1;
s = line = ma_alloc(ma, sz);
if (line == NULL) {
- tablet_error(ma, task, idx, lng_nil, int_nil, "SQLload malloc
error",
+ tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc
error",
"SQLload_error");
return NULL;
}
@@ -869,12 +871,16 @@ SQLinsert_val(allocator *ma, READERtask
|| GDKstrFromStr((unsigned char *) data,
(unsigned char *) s,
strlen(s),
'\0') < 0)
adt = NULL;
- else
+ else {
+ allocator *ma = task->cntxt->curprg->def->ma;
adt = fmt->frstr(ma, fmt, fmt->adt, data);
+ }
//if (data != buf)
// GDKfree(data);
- } else
+ } else {
+ allocator *ma = task->cntxt->curprg->def->ma;
adt = fmt->frstr(ma, fmt, fmt->adt, s);
+ }
}
lng row = task->cnt + idx + 1;
@@ -885,7 +891,7 @@ SQLinsert_val(allocator *ma, READERtask
size_t slen = mystrlen(s);
char *scpy = ma_alloc(ma, slen + 1);
if (scpy == NULL) {
- tablet_error(ma, task, idx, row, col,
+ tablet_error(task, idx, row, col,
SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
task->besteffort = false; /* no
longer best effort */
//GDKfree(err);
@@ -897,7 +903,7 @@ SQLinsert_val(allocator *ma, READERtask
snprintf(buf, sizeof(buf), "'%s' expected%s%s%s",
fmt->type,
s ? " in '" : "", s ? s : "", s ? "'"
: "");
//GDKfree(s);
- tablet_error(ma, task, idx, row, col, buf, err);
+ tablet_error(task, idx, row, col, buf, err);
//GDKfree(err);
if (!task->besteffort)
return -1;
@@ -914,7 +920,7 @@ SQLinsert_val(allocator *ma, READERtask
if (task->rowerror) {
char *msg = GDKerrbuf;
err = SQLload_error(ma, task, idx, task->as->nr_attrs);
- tablet_error(ma, task, idx, row, col, msg
+ tablet_error(task, idx, row, col, msg
&& *msg ? msg : "insert failed", err);
//GDKfree(err);
}
@@ -923,12 +929,11 @@ SQLinsert_val(allocator *ma, READERtask
}
static int
-SQLworker_column(READERtask *task, int col)
+SQLworker_column(allocator *ma, READERtask *task, int col)
{
+ assert(ma);
int i;
Column *fmt = task->as->format;
- allocator *ma = task->cntxt->curprg->def->ma;
- assert(ma);
if (fmt[col].c == NULL)
return 0;
@@ -938,7 +943,7 @@ SQLworker_column(READERtask *task, int c
if (!fmt[col].skip
&& BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next)
{
if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit)
!= GDK_SUCCEED) {
- tablet_error(ma, task, lng_nil, lng_nil, col,
+ tablet_error(task, lng_nil, lng_nil, col,
"Failed to extend the BAT\n",
"SQLworker_column");
MT_lock_unset(&mal_copyLock);
return -1;
@@ -994,7 +999,7 @@ SQLload_parse_row(allocator *ma, READERt
if (!row) {
errline = SQLload_error(ma, task, idx,
i + 1);
snprintf(errmsg, sizeof(errmsg), "Quote
(%c) missing", task->quote);
- tablet_error(ma, task, idx,
startlineno, (int) i, errmsg,
+ tablet_error(task, idx, startlineno,
(int) i, errmsg,
errline);
//GDKfree(errline);
error = true;
@@ -1021,7 +1026,7 @@ SQLload_parse_row(allocator *ma, READERt
if (i < as->nr_attrs - 1) {
errline = SQLload_error(ma, task, idx, i + 1);
/* it's the next value that is missing */
- tablet_error(ma, task, idx, startlineno, (int)
i + 1,
+ tablet_error(task, idx, startlineno, (int) i +
1,
"Column value
missing", errline);
//GDKfree(errline);
error = true;
@@ -1060,7 +1065,7 @@ SQLload_parse_row(allocator *ma, READERt
if (i < as->nr_attrs - 1) {
errline = SQLload_error(ma, task, idx, i + 1);
/* it's the next value that is missing */
- tablet_error(ma, task, idx, startlineno, (int)
i + 1,
+ tablet_error(task, idx, startlineno, (int) i +
1,
"Column value
missing", errline);
//GDKfree(errline);
error = true;
@@ -1082,7 +1087,7 @@ SQLload_parse_row(allocator *ma, READERt
if (row && *row && i == as->nr_attrs) {
errline = SQLload_error(ma, task, idx, task->as->nr_attrs);
snprintf(errmsg, sizeof(errmsg), "Leftover data '%s'", row);
- tablet_error(ma, task, idx, startlineno, (int) i, errmsg,
errline);
+ tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
//GDKfree(errline);
error = true;
}
@@ -1103,10 +1108,12 @@ SQLworker(void *arg)
MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
//allocator *ma = task->cntxt ? ma_create(task->cntxt->curprg->def->ma)
: NULL;
//MT_thread_setallocator(ma);
- allocator *ma = task->cntxt->curprg->def->ma;
+ allocator *ma = MT_thread_getallocator();
+ assert(ma);
MT_sema_down(&task->sema);
while (task->top[task->cur] >= 0) {
+ allocator_state *ma_state = ma_open(ma);
/* stage one, break the rows spread the work over the workers */
switch (task->state) {
case BREAKROW:
@@ -1138,7 +1145,7 @@ SQLworker(void *arg)
for (i = 0; i < task->as->nr_attrs; i++)
if (task->cols[i]) {
t0 = GDKusec();
- if (SQLworker_column(task,
task->cols[i] - 1) < 0)
+ if (SQLworker_column(ma, task,
task->cols[i] - 1) < 0)
break;
t0 = GDKusec() - t0;
task->time[i] += t0;
@@ -1151,6 +1158,7 @@ SQLworker(void *arg)
}
MT_sema_up(&task->reply);
MT_sema_down(&task->sema);
+ ma_close_to(ma, ma_state);
}
MT_sema_up(&task->reply);
@@ -1275,8 +1283,7 @@ SQLproducer(void *p)
int more = 0;
//allocator *ma = task->cntxt ? ma_create(task->cntxt->curprg->def->ma)
: NULL;
//MT_thread_setallocator(ma);
- allocator *ma = task->cntxt->curprg->def->ma;
- allocator *ta = MT_thread_getallocator();
+ allocator *ma = MT_thread_getallocator();
MT_sema_down(&task->producer);
if (task->id < 0) {
@@ -1284,9 +1291,9 @@ SQLproducer(void *p)
}
MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
- rdfa = mkdfa(ta, (const unsigned char *) rsep, rseplen);
+ rdfa = mkdfa(ma, (const unsigned char *) rsep, rseplen);
if (rdfa == NULL) {
- tablet_error(ma, task, lng_nil, lng_nil, int_nil, "cannot
allocate memory",
+ tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate
memory",
"");
ateof[cur] = true;
goto reportlackofinput;
@@ -1308,7 +1315,7 @@ SQLproducer(void *p)
// we may be reading from standard input and may be out of input
// warn the consumers
if (task->aborted || ((lineno & 8191) == 0 &&
bstream_getoob(task->cntxt->fdin))) {
- tablet_error(ma, task, rowno, lineno, int_nil,
+ tablet_error(task, rowno, lineno, int_nil,
"problem reported by client",
s);
ateof[cur] = true;
goto reportlackofinput;
@@ -1316,7 +1323,7 @@ SQLproducer(void *p)
if (ateof[cur] && partial) {
if (unlikely(partial)) {
- tablet_error(ma, task, rowno, lineno, int_nil,
+ tablet_error(task, rowno, lineno, int_nil,
"incomplete record at
end of file", s);
task->b->pos += partial;
}
@@ -1325,7 +1332,7 @@ SQLproducer(void *p)
if (task->errbuf && task->errbuf[0]) {
if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
- tablet_error(ma, task, rowno, lineno, int_nil,
GDKerrbuf,
+ tablet_error(task, rowno, lineno, int_nil,
GDKerrbuf,
"SQLload_file");
/* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
ateof[cur] = true;
@@ -1345,7 +1352,7 @@ SQLproducer(void *p)
/* the input buffer should be extended, but 'base' is
not shared
between the threads, which we can not now update.
Mimic an ateof instead; */
- tablet_error(ma, task, rowno, lineno, int_nil, "record
too long", "");
+ tablet_error(task, rowno, lineno, int_nil, "record too
long", "");
ateof[cur] = true;
/* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted
with too large record\n");*/
goto reportlackofinput;
@@ -1442,7 +1449,7 @@ SQLproducer(void *p)
/* found an incomplete record, saved for next
round */
if (unlikely(s + partial < end)) {
/* found a EOS in the input */
- tablet_error(ma, task, rowno,
startlineno, int_nil,
+ tablet_error(task, rowno, startlineno,
int_nil,
"record too
long (EOS found)", "");
ateof[cur] = true;
goto reportlackofinput;
@@ -1544,8 +1551,9 @@ SQLproducer(void *p)
if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
char msg[256];
snprintf(msg, sizeof(msg), "incomplete record at end of
file:%s\n", s);
- task->as->error = GDKstrdup(msg);
- tablet_error(ma, task, rowno, startlineno, int_nil,
+ allocator *ma = task->cntxt->curprg->def->ma;
+ task->as->error = MA_STRDUP(ma, msg);
+ tablet_error(task, rowno, startlineno, int_nil,
"incomplete record at end of file", s);
task->b->pos += partial;
}
@@ -1555,7 +1563,7 @@ SQLproducer(void *p)
return;
badutf8:
- tablet_error(ma, task, rowno, startlineno, int_nil,
+ tablet_error(task, rowno, startlineno, int_nil,
"input not properly encoded UTF-8", "");
ateof[cur] = true;
goto reportlackofinput;
@@ -1622,7 +1630,7 @@ SQLload_file(Client cntxt, Tablet *as, b
create_rejects_table(task.cntxt);
if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
|| task.cntxt->error_msg == NULL || task.cntxt->error_input ==
NULL) {
- tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+ tablet_error(&task, lng_nil, lng_nil, int_nil,
"SQLload initialization failed", "");
/* nothing allocated yet, so nothing to free */
return BUN_NONE;
@@ -1635,7 +1643,7 @@ SQLload_file(Client cntxt, Tablet *as, b
task.cols = (int *) ma_zalloc(ma, as->nr_attrs * sizeof(int));
task.time = (lng *) ma_zalloc(ma, as->nr_attrs * sizeof(lng));
if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
- tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+ tablet_error(&task, lng_nil, lng_nil, int_nil,
"memory allocation failed",
"SQLload_file");
goto bailout;
}
@@ -1644,7 +1652,7 @@ SQLload_file(Client cntxt, Tablet *as, b
task.base[i] = ma_alloc(ma, MAXROWSIZE(2 * b->size) + 2);
task.rowlimit[i] = MAXROWSIZE(2 * b->size);
if (task.base[i] == NULL) {
- tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+ tablet_error(&task, lng_nil, lng_nil, int_nil,
SQLSTATE(HY013)
MAL_MALLOC_FAIL, "SQLload_file");
goto bailout;
}
@@ -1697,7 +1705,7 @@ SQLload_file(Client cntxt, Tablet *as, b
if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
//GDKfree(task.rows[i]);
//GDKfree(task.startlineno[i]);
- tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+ tablet_error(&task, lng_nil, lng_nil, int_nil,
SQLSTATE(HY013)
MAL_MALLOC_FAIL,
"SQLload_file:failed to alloc
buffers");
goto bailout;
@@ -1705,7 +1713,7 @@ SQLload_file(Client cntxt, Tablet *as, b
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]