Changeset: 94ff90f05b23 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/94ff90f05b23
Modified Files:
        monetdb5/modules/atoms/json.c
        monetdb5/modules/mal/tablet.c
Branch: Dec2025
Log Message:

Make more use of thread-local allocator, here in tablet code.


diffs (truncated from 397 to 300 lines):

diff --git a/monetdb5/modules/atoms/json.c b/monetdb5/modules/atoms/json.c
--- a/monetdb5/modules/atoms/json.c
+++ b/monetdb5/modules/atoms/json.c
@@ -503,8 +503,7 @@ JSONstr2json_intern(allocator *ma, json 
        }
 
        allocator *ta = MT_thread_getallocator();
-       allocator_state ta_state = {0};
-       ta_state = ma_open(ta);
+       allocator_state ta_state = ma_open(ta);
 
        if (strNil(*j)) {
                strcpy(buf, *j);
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -808,12 +808,13 @@ mycpstr(char *t, const char *s, size_t l
 }
 
 static str
-SQLload_error(allocator *ma, READERtask *task, lng idx, BUN attrs)
+SQLload_error(READERtask *task, lng idx, BUN attrs)
 {
        str line;
        char *s;
        size_t sz = 0;
        BUN i;
+       allocator *ma = MT_thread_getallocator();
 
        for (i = 0; i < attrs; i++) {
                if (task->fields[i][idx])
@@ -849,7 +850,7 @@ SQLload_error(allocator *ma, READERtask 
  * either case an entry is added to the error table.
  */
 static inline int
-SQLinsert_val(allocator *ma, READERtask *task, int col, int idx)
+SQLinsert_val(READERtask *task, int col, int idx)
 {
        Column *fmt = task->as->format + col;
        const void *adt;
@@ -857,6 +858,9 @@ SQLinsert_val(allocator *ma, READERtask 
        char *s = task->fields[col][idx];
        char *err = NULL;
        int ret = 0;
+       allocator *ta = MT_thread_getallocator();
+       allocator_state ta_state = ma_open(ta);
+       allocator *ma = task->cntxt->curprg->def->ma;
 
        /* include testing on the terminating null byte !! */
        if (s == NULL) {
@@ -865,7 +869,7 @@ SQLinsert_val(allocator *ma, READERtask 
        } else {
                if (task->escape) {
                        size_t slen = strlen(s) + 1;
-                       char *data = slen <= sizeof(buf) ? buf : ma_alloc(ma, 
strlen(s) + 1);
+                       char *data = slen <= sizeof(buf) ? buf : ma_alloc(ta, 
strlen(s) + 1);
                        if (data == NULL
                                || GDKstrFromStr((unsigned char *) data, 
(unsigned char *) s,
                                                                 strlen(s), 
'\0') < 0)
@@ -883,15 +887,15 @@ SQLinsert_val(allocator *ma, READERtask 
        lng row = task->cnt + idx + 1;
        if (adt == NULL) {
                if (task->rowerror) {
-                       err = SQLload_error(ma, task, idx, task->as->nr_attrs);
+                       err = SQLload_error(task, idx, task->as->nr_attrs);
                        if (s) {
                                size_t slen = mystrlen(s);
-                               char *scpy = ma_alloc(ma, slen + 1);
+                               char *scpy = ma_alloc(ta, slen + 1);
                                if (scpy == NULL) {
                                        tablet_error(task, idx, row, col,
                                                                 
SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
                                        task->besteffort = false;       /* no 
longer best effort */
-                                       //GDKfree(err);
+                                       ma_close(ta, &ta_state);
                                        return -1;
                                }
                                mycpstr(scpy, s, slen + 1);
@@ -902,33 +906,37 @@ SQLinsert_val(allocator *ma, READERtask 
                        //GDKfree(s);
                        tablet_error(task, idx, row, col, buf, err);
                        //GDKfree(err);
-                       if (!task->besteffort)
+                       if (!task->besteffort) {
+                               ma_close(ta, &ta_state);
                                return -1;
+                       }
                }
                ret = -!task->besteffort;       /* yep, two unary operators ;-) 
*/
                /* replace it with a nil */
                adt = fmt->nildata;
                fmt->c->tnonil = false;
        }
-       if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
+       if (bunfastapp(fmt->c, adt) == GDK_SUCCEED) {
+               ma_close(ta, &ta_state);
                return ret;
+       }
 
        /* failure */
        if (task->rowerror) {
                char *msg = GDKerrbuf;
-               err = SQLload_error(ma, task, idx, task->as->nr_attrs);
+               err = SQLload_error(task, idx, task->as->nr_attrs);
                tablet_error(task, idx, row, col, msg
                                         && *msg ? msg : "insert failed", err);
                //GDKfree(err);
        }
        task->besteffort = false;       /* no longer best effort */
+       ma_close(ta, &ta_state);
        return -1;
 }
 
 static int
-SQLworker_column(allocator *ma, READERtask *task, int col)
+SQLworker_column(READERtask *task, int col)
 {
-       assert(ma);
        int i;
        Column *fmt = task->as->format;
 
@@ -949,7 +957,7 @@ SQLworker_column(allocator *ma, READERta
        MT_lock_unset(&mal_copyLock);
 
        for (i = 0; i < task->top[task->cur]; i++) {
-               if (!fmt[col].skip && SQLinsert_val(ma, task, col, i) < 0) {
+               if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
                        BATsetcount(fmt[col].c, BATcount(fmt[col].c));
                        return -1;
                }
@@ -967,7 +975,7 @@ SQLworker_column(allocator *ma, READERta
  * We also trim the quotes around strings.
  */
 static int
-SQLload_parse_row(allocator *ma, READERtask *task, int idx)
+SQLload_parse_row(READERtask *task, int idx)
 {
        BUN i;
        char errmsg[BUFSIZ];
@@ -994,7 +1002,7 @@ SQLload_parse_row(allocator *ma, READERt
                                row = tablet_skip_string(row + 1, task->quote, 
task->escape);
 
                                if (!row) {
-                                       errline = SQLload_error(ma, task, idx, 
i + 1);
+                                       errline = SQLload_error(task, idx, i + 
1);
                                        snprintf(errmsg, sizeof(errmsg), "Quote 
(%c) missing", task->quote);
                                        tablet_error(task, idx, startlineno, 
(int) i, errmsg,
                                                                 errline);
@@ -1021,7 +1029,7 @@ SQLload_parse_row(allocator *ma, READERt
 
                        /* not enough fields */
                        if (i < as->nr_attrs - 1) {
-                               errline = SQLload_error(ma, task, idx, i + 1);
+                               errline = SQLload_error(task, idx, i + 1);
                                /* it's the next value that is missing */
                                tablet_error(task, idx, startlineno, (int) i + 
1,
                                                         "Column value 
missing", errline);
@@ -1060,7 +1068,7 @@ SQLload_parse_row(allocator *ma, READERt
 
                        /* not enough fields */
                        if (i < as->nr_attrs - 1) {
-                               errline = SQLload_error(ma, task, idx, i + 1);
+                               errline = SQLload_error(task, idx, i + 1);
                                /* it's the next value that is missing */
                                tablet_error(task, idx, startlineno, (int) i + 
1,
                                                         "Column value 
missing", errline);
@@ -1082,7 +1090,7 @@ SQLload_parse_row(allocator *ma, READERt
        }
        /* check for too many values as well */
        if (row && *row && i == as->nr_attrs) {
-               errline = SQLload_error(ma, task, idx, task->as->nr_attrs);
+               errline = SQLload_error(task, idx, task->as->nr_attrs);
                snprintf(errmsg, sizeof(errmsg), "Leftover data '%s'", row);
                tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
                //GDKfree(errline);
@@ -1101,8 +1109,6 @@ SQLworker(void *arg)
 
        GDKclrerr();
        MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
-       allocator *ma = task->cntxt->curprg->def->ma;
-       assert(ma);
 
        MT_sema_down(&task->sema);
        while (task->top[task->cur] >= 0) {
@@ -1115,7 +1121,7 @@ SQLworker(void *arg)
                        for (j = piece * task->id;
                                 j < task->top[task->cur] && j < piece * 
(task->id + 1); j++)
                                if (task->rows[task->cur][j]) {
-                                       if (SQLload_parse_row(ma, task, j) < 0) 
{
+                                       if (SQLload_parse_row(task, j) < 0) {
                                                task->errorcnt++;
                                                // early break unless best 
effort
                                                if (!task->besteffort) {
@@ -1137,7 +1143,7 @@ SQLworker(void *arg)
                        for (i = 0; i < task->as->nr_attrs; i++)
                                if (task->cols[i]) {
                                        t0 = GDKusec();
-                                       if (SQLworker_column(ma, task, 
task->cols[i] - 1) < 0)
+                                       if (SQLworker_column(task, 
task->cols[i] - 1) < 0)
                                                break;
                                        t0 = GDKusec() - t0;
                                        task->time[i] += t0;
@@ -1277,8 +1283,10 @@ SQLproducer(void *p)
                return;
        }
 
+       allocator *ta = MT_thread_getallocator();
+       allocator_state ta_state = ma_open(ta);
        MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
-       rdfa = mkdfa(ma, (const unsigned char *) rsep, rseplen);
+       rdfa = mkdfa(ta, (const unsigned char *) rsep, rseplen);
        if (rdfa == NULL) {
                tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate 
memory",
                                         "");
@@ -1317,7 +1325,7 @@ SQLproducer(void *p)
                        goto reportlackofinput;
                }
 
-               if (GDKerrbuf[0]) {
+               if (unlikely(GDKerrbuf[0])) {
                        tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
                                                 "SQLload_file");
 /*                             TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
@@ -1475,6 +1483,7 @@ SQLproducer(void *p)
                        if (cnt == task->maxrow) {
                                //GDKfree(rdfa);
                                MT_thread_set_qry_ctx(NULL);
+                               ma_close(ta, &ta_state);
                                return;
                        }
                } else {
@@ -1488,6 +1497,7 @@ SQLproducer(void *p)
                                if (task->state == ENDOFCOPY) {
                                        //GDKfree(rdfa);
                                        MT_thread_set_qry_ctx(NULL);
+                                       ma_close(ta, &ta_state);
                                        return;
                                }
                        }
@@ -1511,6 +1521,7 @@ SQLproducer(void *p)
 /*                             TRC_DEBUG(MAL_SERVER, "Producer delivered 
all\n");*/
                                //GDKfree(rdfa);
                                MT_thread_set_qry_ctx(NULL);
+                               ma_close(ta, &ta_state);
                                return;
                        }
                }
@@ -1521,12 +1532,14 @@ SQLproducer(void *p)
 /*                     TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
                        //GDKfree(rdfa);
                        MT_thread_set_qry_ctx(NULL);
+                       ma_close(ta, &ta_state);
                        return;
                }
                /* consumers ask us to stop? */
                if (task->state == ENDOFCOPY) {
                        //GDKfree(rdfa);
                        MT_thread_set_qry_ctx(NULL);
+                       ma_close(ta, &ta_state);
                        return;
                }
                bufcnt[cur] = cnt;
@@ -1543,6 +1556,7 @@ SQLproducer(void *p)
        }
        //GDKfree(rdfa);
        MT_thread_set_qry_ctx(NULL);
+       ma_close(ta, &ta_state);
 
        return;
 
@@ -1590,6 +1604,8 @@ SQLload_file(Client cntxt, Tablet *as, b
        lng tio, t1 = 0;
        char name[MT_NAME_LEN];
        allocator *ma = cntxt->curprg->def->ma;
+       allocator *ta = MT_thread_getallocator();
+       allocator_state ta_state = ma_open(ta);
 
        if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
                threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
@@ -1613,7 +1629,7 @@ SQLload_file(Client cntxt, Tablet *as, b
        /* create the reject tables */
        create_rejects_table(task.cntxt);
        if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
-               || task.cntxt->error_msg == NULL || task.cntxt->error_input == 
NULL) {
+           || task.cntxt->error_msg == NULL || task.cntxt->error_input == 
NULL) {
                tablet_error(&task, lng_nil, lng_nil, int_nil,
                                         "SQLload initialization failed", "");
                /* nothing allocated yet, so nothing to free */
@@ -1623,9 +1639,9 @@ SQLload_file(Client cntxt, Tablet *as, b
        assert(rsep);
        assert(csep);
        assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
-       task.fields = (char ***) ma_zalloc(ma, as->nr_attrs * sizeof(char **));
-       task.cols = (int *) ma_zalloc(ma, as->nr_attrs * sizeof(int));
-       task.time = (lng *) ma_zalloc(ma, as->nr_attrs * sizeof(lng));
+       task.fields = (char ***) ma_zalloc(ta, as->nr_attrs * sizeof(char **));
+       task.cols = (int *) ma_zalloc(ta, as->nr_attrs * sizeof(int));
+       task.time = (lng *) ma_zalloc(ta, as->nr_attrs * sizeof(lng));
        if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
                tablet_error(&task, lng_nil, lng_nil, int_nil,
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to