Changeset: 01c82754c653 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/01c82754c653
Modified Files:
        monetdb5/mal/mal_exception.c
        monetdb5/modules/mal/tablet.c
Branch: resource_management
Log Message:

use tls allocator more rigorously in tablet code


diffs (truncated from 335 to 300 lines):

diff --git a/monetdb5/mal/mal_exception.c b/monetdb5/mal/mal_exception.c
--- a/monetdb5/mal/mal_exception.c
+++ b/monetdb5/mal/mal_exception.c
@@ -372,6 +372,6 @@ inline str
 copyException(allocator *ma, const char *exception)
 {
        if (exception)
-               return MA_STRDUP(ma, exception);
+               return ma ? MA_STRDUP(ma, exception) : (char *)exception;
        return NULL;
 }
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -638,11 +638,13 @@ tablet_read_more(READERtask *task)
 /* note, the column value that is passed here is the 0 based value; the
  * lineno value on the other hand is 1 based */
 static void
-tablet_error(allocator *ma, READERtask *task, lng idx, lng lineno, int col, 
const char *msg,
+tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
                         const char *fcn)
 {
        assert(is_int_nil(col) || col >= 0);
        assert(is_lng_nil(lineno) || lineno >= 1);
+       allocator *ma = task->cntxt->curprg->def->ma;
+       assert(ma);
        MT_lock_set(&errorlock);
        if (task->cntxt->error_row != NULL
                && (BUNappend(task->cntxt->error_row, &lineno, false) != 
GDK_SUCCEED
@@ -823,7 +825,7 @@ SQLload_error(allocator *ma, READERtask 
        sz += task->rseplen + 1;
        s = line = ma_alloc(ma, sz);
        if (line == NULL) {
-               tablet_error(ma, task, idx, lng_nil, int_nil, "SQLload malloc 
error",
+               tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc 
error",
                                         "SQLload_error");
                return NULL;
        }
@@ -869,12 +871,16 @@ SQLinsert_val(allocator *ma, READERtask 
                                || GDKstrFromStr((unsigned char *) data, 
(unsigned char *) s,
                                                                 strlen(s), 
'\0') < 0)
                                adt = NULL;
-                       else
+                       else {
+                               allocator *ma = task->cntxt->curprg->def->ma;
                                adt = fmt->frstr(ma, fmt, fmt->adt, data);
+                       }
                        //if (data != buf)
                        //      GDKfree(data);
-               } else
+               } else {
+                       allocator *ma = task->cntxt->curprg->def->ma;
                        adt = fmt->frstr(ma, fmt, fmt->adt, s);
+               }
        }
 
        lng row = task->cnt + idx + 1;
@@ -885,7 +891,7 @@ SQLinsert_val(allocator *ma, READERtask 
                                size_t slen = mystrlen(s);
                                char *scpy = ma_alloc(ma, slen + 1);
                                if (scpy == NULL) {
-                                       tablet_error(ma, task, idx, row, col,
+                                       tablet_error(task, idx, row, col,
                                                                 
SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
                                        task->besteffort = false;       /* no 
longer best effort */
                                        //GDKfree(err);
@@ -897,7 +903,7 @@ SQLinsert_val(allocator *ma, READERtask 
                        snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", 
fmt->type,
                                         s ? " in '" : "", s ? s : "", s ? "'" 
: "");
                        //GDKfree(s);
-                       tablet_error(ma, task, idx, row, col, buf, err);
+                       tablet_error(task, idx, row, col, buf, err);
                        //GDKfree(err);
                        if (!task->besteffort)
                                return -1;
@@ -914,7 +920,7 @@ SQLinsert_val(allocator *ma, READERtask 
        if (task->rowerror) {
                char *msg = GDKerrbuf;
                err = SQLload_error(ma, task, idx, task->as->nr_attrs);
-               tablet_error(ma, task, idx, row, col, msg
+               tablet_error(task, idx, row, col, msg
                                         && *msg ? msg : "insert failed", err);
                //GDKfree(err);
        }
@@ -923,12 +929,11 @@ SQLinsert_val(allocator *ma, READERtask 
 }
 
 static int
-SQLworker_column(READERtask *task, int col)
+SQLworker_column(allocator *ma, READERtask *task, int col)
 {
+       assert(ma);
        int i;
        Column *fmt = task->as->format;
-       allocator *ma = task->cntxt->curprg->def->ma;
-       assert(ma);
 
        if (fmt[col].c == NULL)
                return 0;
@@ -938,7 +943,7 @@ SQLworker_column(READERtask *task, int c
        if (!fmt[col].skip
                && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) 
{
                if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) 
!= GDK_SUCCEED) {
-                       tablet_error(ma, task, lng_nil, lng_nil, col,
+                       tablet_error(task, lng_nil, lng_nil, col,
                                                 "Failed to extend the BAT\n", 
"SQLworker_column");
                        MT_lock_unset(&mal_copyLock);
                        return -1;
@@ -994,7 +999,7 @@ SQLload_parse_row(allocator *ma, READERt
                                if (!row) {
                                        errline = SQLload_error(ma, task, idx, 
i + 1);
                                        snprintf(errmsg, sizeof(errmsg), "Quote 
(%c) missing", task->quote);
-                                       tablet_error(ma, task, idx, 
startlineno, (int) i, errmsg,
+                                       tablet_error(task, idx, startlineno, 
(int) i, errmsg,
                                                                 errline);
                                        //GDKfree(errline);
                                        error = true;
@@ -1021,7 +1026,7 @@ SQLload_parse_row(allocator *ma, READERt
                        if (i < as->nr_attrs - 1) {
                                errline = SQLload_error(ma, task, idx, i + 1);
                                /* it's the next value that is missing */
-                               tablet_error(ma, task, idx, startlineno, (int) 
i + 1,
+                               tablet_error(task, idx, startlineno, (int) i + 
1,
                                                         "Column value 
missing", errline);
                                //GDKfree(errline);
                                error = true;
@@ -1060,7 +1065,7 @@ SQLload_parse_row(allocator *ma, READERt
                        if (i < as->nr_attrs - 1) {
                                errline = SQLload_error(ma, task, idx, i + 1);
                                /* it's the next value that is missing */
-                               tablet_error(ma, task, idx, startlineno, (int) 
i + 1,
+                               tablet_error(task, idx, startlineno, (int) i + 
1,
                                                         "Column value 
missing", errline);
                                //GDKfree(errline);
                                error = true;
@@ -1082,7 +1087,7 @@ SQLload_parse_row(allocator *ma, READERt
        if (row && *row && i == as->nr_attrs) {
                errline = SQLload_error(ma, task, idx, task->as->nr_attrs);
                snprintf(errmsg, sizeof(errmsg), "Leftover data '%s'", row);
-               tablet_error(ma, task, idx, startlineno, (int) i, errmsg, 
errline);
+               tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
                //GDKfree(errline);
                error = true;
        }
@@ -1103,10 +1108,12 @@ SQLworker(void *arg)
        MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
        //allocator *ma = task->cntxt ? ma_create(task->cntxt->curprg->def->ma) 
: NULL;
        //MT_thread_setallocator(ma);
-       allocator *ma =  task->cntxt->curprg->def->ma;
+       allocator *ma = MT_thread_getallocator();
+       assert(ma);
 
        MT_sema_down(&task->sema);
        while (task->top[task->cur] >= 0) {
+               allocator_state *ma_state = ma_open(ma);
                /* stage one, break the rows spread the work over the workers */
                switch (task->state) {
                case BREAKROW:
@@ -1138,7 +1145,7 @@ SQLworker(void *arg)
                        for (i = 0; i < task->as->nr_attrs; i++)
                                if (task->cols[i]) {
                                        t0 = GDKusec();
-                                       if (SQLworker_column(task, 
task->cols[i] - 1) < 0)
+                                       if (SQLworker_column(ma, task, 
task->cols[i] - 1) < 0)
                                                break;
                                        t0 = GDKusec() - t0;
                                        task->time[i] += t0;
@@ -1151,6 +1158,7 @@ SQLworker(void *arg)
                }
                MT_sema_up(&task->reply);
                MT_sema_down(&task->sema);
+               ma_close_to(ma, ma_state);
        }
        MT_sema_up(&task->reply);
 
@@ -1275,8 +1283,7 @@ SQLproducer(void *p)
        int more = 0;
        //allocator *ma = task->cntxt ? ma_create(task->cntxt->curprg->def->ma) 
: NULL;
        //MT_thread_setallocator(ma);
-       allocator *ma = task->cntxt->curprg->def->ma;
-       allocator *ta = MT_thread_getallocator();
+       allocator *ma = MT_thread_getallocator();
 
        MT_sema_down(&task->producer);
        if (task->id < 0) {
@@ -1284,9 +1291,9 @@ SQLproducer(void *p)
        }
 
        MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
-       rdfa = mkdfa(ta, (const unsigned char *) rsep, rseplen);
+       rdfa = mkdfa(ma, (const unsigned char *) rsep, rseplen);
        if (rdfa == NULL) {
-               tablet_error(ma, task, lng_nil, lng_nil, int_nil, "cannot 
allocate memory",
+               tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate 
memory",
                                         "");
                ateof[cur] = true;
                goto reportlackofinput;
@@ -1308,7 +1315,7 @@ SQLproducer(void *p)
                // we may be reading from standard input and may be out of input
                // warn the consumers
                if (task->aborted || ((lineno & 8191) == 0 && 
bstream_getoob(task->cntxt->fdin))) {
-                       tablet_error(ma, task, rowno, lineno, int_nil,
+                       tablet_error(task, rowno, lineno, int_nil,
                                                 "problem reported by client", 
s);
                        ateof[cur] = true;
                        goto reportlackofinput;
@@ -1316,7 +1323,7 @@ SQLproducer(void *p)
 
                if (ateof[cur] && partial) {
                        if (unlikely(partial)) {
-                               tablet_error(ma, task, rowno, lineno, int_nil,
+                               tablet_error(task, rowno, lineno, int_nil,
                                                         "incomplete record at 
end of file", s);
                                task->b->pos += partial;
                        }
@@ -1325,7 +1332,7 @@ SQLproducer(void *p)
 
                if (task->errbuf && task->errbuf[0]) {
                        if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
-                               tablet_error(ma, task, rowno, lineno, int_nil, 
GDKerrbuf,
+                               tablet_error(task, rowno, lineno, int_nil, 
GDKerrbuf,
                                                         "SQLload_file");
 /*                             TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
                                ateof[cur] = true;
@@ -1345,7 +1352,7 @@ SQLproducer(void *p)
                        /* the input buffer should be extended, but 'base' is 
not shared
                           between the threads, which we can not now update.
                           Mimic an ateof instead; */
-                       tablet_error(ma, task, rowno, lineno, int_nil, "record 
too long", "");
+                       tablet_error(task, rowno, lineno, int_nil, "record too 
long", "");
                        ateof[cur] = true;
 /*                     TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted 
with too large record\n");*/
                        goto reportlackofinput;
@@ -1442,7 +1449,7 @@ SQLproducer(void *p)
                                /* found an incomplete record, saved for next 
round */
                                if (unlikely(s + partial < end)) {
                                        /* found a EOS in the input */
-                                       tablet_error(ma, task, rowno, 
startlineno, int_nil,
+                                       tablet_error(task, rowno, startlineno, 
int_nil,
                                                                 "record too 
long (EOS found)", "");
                                        ateof[cur] = true;
                                        goto reportlackofinput;
@@ -1544,8 +1551,9 @@ SQLproducer(void *p)
        if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
                char msg[256];
                snprintf(msg, sizeof(msg), "incomplete record at end of 
file:%s\n", s);
-               task->as->error = GDKstrdup(msg);
-               tablet_error(ma, task, rowno, startlineno, int_nil,
+               allocator *ma = task->cntxt->curprg->def->ma;
+               task->as->error = MA_STRDUP(ma, msg);
+               tablet_error(task, rowno, startlineno, int_nil,
                                         "incomplete record at end of file", s);
                task->b->pos += partial;
        }
@@ -1555,7 +1563,7 @@ SQLproducer(void *p)
        return;
 
   badutf8:
-       tablet_error(ma, task, rowno, startlineno, int_nil,
+       tablet_error(task, rowno, startlineno, int_nil,
                                 "input not properly encoded UTF-8", "");
        ateof[cur] = true;
        goto reportlackofinput;
@@ -1622,7 +1630,7 @@ SQLload_file(Client cntxt, Tablet *as, b
        create_rejects_table(task.cntxt);
        if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
                || task.cntxt->error_msg == NULL || task.cntxt->error_input == 
NULL) {
-               tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+               tablet_error(&task, lng_nil, lng_nil, int_nil,
                                         "SQLload initialization failed", "");
                /* nothing allocated yet, so nothing to free */
                return BUN_NONE;
@@ -1635,7 +1643,7 @@ SQLload_file(Client cntxt, Tablet *as, b
        task.cols = (int *) ma_zalloc(ma, as->nr_attrs * sizeof(int));
        task.time = (lng *) ma_zalloc(ma, as->nr_attrs * sizeof(lng));
        if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
-               tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+               tablet_error(&task, lng_nil, lng_nil, int_nil,
                                         "memory allocation failed", 
"SQLload_file");
                goto bailout;
        }
@@ -1644,7 +1652,7 @@ SQLload_file(Client cntxt, Tablet *as, b
                task.base[i] = ma_alloc(ma, MAXROWSIZE(2 * b->size) + 2);
                task.rowlimit[i] = MAXROWSIZE(2 * b->size);
                if (task.base[i] == NULL) {
-                       tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+                       tablet_error(&task, lng_nil, lng_nil, int_nil,
                                                 SQLSTATE(HY013) 
MAL_MALLOC_FAIL, "SQLload_file");
                        goto bailout;
                }
@@ -1697,7 +1705,7 @@ SQLload_file(Client cntxt, Tablet *as, b
                if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
                        //GDKfree(task.rows[i]);
                        //GDKfree(task.startlineno[i]);
-                       tablet_error(ma, &task, lng_nil, lng_nil, int_nil,
+                       tablet_error(&task, lng_nil, lng_nil, int_nil,
                                                 SQLSTATE(HY013) 
MAL_MALLOC_FAIL,
                                                 "SQLload_file:failed to alloc 
buffers");
                        goto bailout;
@@ -1705,7 +1713,7 @@ SQLload_file(Client cntxt, Tablet *as, b
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to