Changeset: 94ff90f05b23 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/94ff90f05b23
Modified Files:
monetdb5/modules/atoms/json.c
monetdb5/modules/mal/tablet.c
Branch: Dec2025
Log Message:
Make more use of thread-local allocator, here in tablet code.
diffs (truncated from 397 to 300 lines):
diff --git a/monetdb5/modules/atoms/json.c b/monetdb5/modules/atoms/json.c
--- a/monetdb5/modules/atoms/json.c
+++ b/monetdb5/modules/atoms/json.c
@@ -503,8 +503,7 @@ JSONstr2json_intern(allocator *ma, json
}
allocator *ta = MT_thread_getallocator();
- allocator_state ta_state = {0};
- ta_state = ma_open(ta);
+ allocator_state ta_state = ma_open(ta);
if (strNil(*j)) {
strcpy(buf, *j);
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -808,12 +808,13 @@ mycpstr(char *t, const char *s, size_t l
}
static str
-SQLload_error(allocator *ma, READERtask *task, lng idx, BUN attrs)
+SQLload_error(READERtask *task, lng idx, BUN attrs)
{
str line;
char *s;
size_t sz = 0;
BUN i;
+ allocator *ma = MT_thread_getallocator();
for (i = 0; i < attrs; i++) {
if (task->fields[i][idx])
@@ -849,7 +850,7 @@ SQLload_error(allocator *ma, READERtask
* either case an entry is added to the error table.
*/
static inline int
-SQLinsert_val(allocator *ma, READERtask *task, int col, int idx)
+SQLinsert_val(READERtask *task, int col, int idx)
{
Column *fmt = task->as->format + col;
const void *adt;
@@ -857,6 +858,9 @@ SQLinsert_val(allocator *ma, READERtask
char *s = task->fields[col][idx];
char *err = NULL;
int ret = 0;
+ allocator *ta = MT_thread_getallocator();
+ allocator_state ta_state = ma_open(ta);
+ allocator *ma = task->cntxt->curprg->def->ma;
/* include testing on the terminating null byte !! */
if (s == NULL) {
@@ -865,7 +869,7 @@ SQLinsert_val(allocator *ma, READERtask
} else {
if (task->escape) {
size_t slen = strlen(s) + 1;
- char *data = slen <= sizeof(buf) ? buf : ma_alloc(ma,
strlen(s) + 1);
+ char *data = slen <= sizeof(buf) ? buf : ma_alloc(ta,
strlen(s) + 1);
if (data == NULL
|| GDKstrFromStr((unsigned char *) data,
(unsigned char *) s,
strlen(s),
'\0') < 0)
@@ -883,15 +887,15 @@ SQLinsert_val(allocator *ma, READERtask
lng row = task->cnt + idx + 1;
if (adt == NULL) {
if (task->rowerror) {
- err = SQLload_error(ma, task, idx, task->as->nr_attrs);
+ err = SQLload_error(task, idx, task->as->nr_attrs);
if (s) {
size_t slen = mystrlen(s);
- char *scpy = ma_alloc(ma, slen + 1);
+ char *scpy = ma_alloc(ta, slen + 1);
if (scpy == NULL) {
tablet_error(task, idx, row, col,
SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
task->besteffort = false; /* no
longer best effort */
- //GDKfree(err);
+ ma_close(ta, &ta_state);
return -1;
}
mycpstr(scpy, s, slen + 1);
@@ -902,33 +906,37 @@ SQLinsert_val(allocator *ma, READERtask
//GDKfree(s);
tablet_error(task, idx, row, col, buf, err);
//GDKfree(err);
- if (!task->besteffort)
+ if (!task->besteffort) {
+ ma_close(ta, &ta_state);
return -1;
+ }
}
ret = -!task->besteffort; /* yep, two unary operators ;-)
*/
/* replace it with a nil */
adt = fmt->nildata;
fmt->c->tnonil = false;
}
- if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
+ if (bunfastapp(fmt->c, adt) == GDK_SUCCEED) {
+ ma_close(ta, &ta_state);
return ret;
+ }
/* failure */
if (task->rowerror) {
char *msg = GDKerrbuf;
- err = SQLload_error(ma, task, idx, task->as->nr_attrs);
+ err = SQLload_error(task, idx, task->as->nr_attrs);
tablet_error(task, idx, row, col, msg
&& *msg ? msg : "insert failed", err);
//GDKfree(err);
}
task->besteffort = false; /* no longer best effort */
+ ma_close(ta, &ta_state);
return -1;
}
static int
-SQLworker_column(allocator *ma, READERtask *task, int col)
+SQLworker_column(READERtask *task, int col)
{
- assert(ma);
int i;
Column *fmt = task->as->format;
@@ -949,7 +957,7 @@ SQLworker_column(allocator *ma, READERta
MT_lock_unset(&mal_copyLock);
for (i = 0; i < task->top[task->cur]; i++) {
- if (!fmt[col].skip && SQLinsert_val(ma, task, col, i) < 0) {
+ if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
BATsetcount(fmt[col].c, BATcount(fmt[col].c));
return -1;
}
@@ -967,7 +975,7 @@ SQLworker_column(allocator *ma, READERta
* We also trim the quotes around strings.
*/
static int
-SQLload_parse_row(allocator *ma, READERtask *task, int idx)
+SQLload_parse_row(READERtask *task, int idx)
{
BUN i;
char errmsg[BUFSIZ];
@@ -994,7 +1002,7 @@ SQLload_parse_row(allocator *ma, READERt
row = tablet_skip_string(row + 1, task->quote,
task->escape);
if (!row) {
- errline = SQLload_error(ma, task, idx,
i + 1);
+ errline = SQLload_error(task, idx, i +
1);
snprintf(errmsg, sizeof(errmsg), "Quote
(%c) missing", task->quote);
tablet_error(task, idx, startlineno,
(int) i, errmsg,
errline);
@@ -1021,7 +1029,7 @@ SQLload_parse_row(allocator *ma, READERt
/* not enough fields */
if (i < as->nr_attrs - 1) {
- errline = SQLload_error(ma, task, idx, i + 1);
+ errline = SQLload_error(task, idx, i + 1);
/* it's the next value that is missing */
tablet_error(task, idx, startlineno, (int) i +
1,
"Column value
missing", errline);
@@ -1060,7 +1068,7 @@ SQLload_parse_row(allocator *ma, READERt
/* not enough fields */
if (i < as->nr_attrs - 1) {
- errline = SQLload_error(ma, task, idx, i + 1);
+ errline = SQLload_error(task, idx, i + 1);
/* it's the next value that is missing */
tablet_error(task, idx, startlineno, (int) i +
1,
"Column value
missing", errline);
@@ -1082,7 +1090,7 @@ SQLload_parse_row(allocator *ma, READERt
}
/* check for too many values as well */
if (row && *row && i == as->nr_attrs) {
- errline = SQLload_error(ma, task, idx, task->as->nr_attrs);
+ errline = SQLload_error(task, idx, task->as->nr_attrs);
snprintf(errmsg, sizeof(errmsg), "Leftover data '%s'", row);
tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
//GDKfree(errline);
@@ -1101,8 +1109,6 @@ SQLworker(void *arg)
GDKclrerr();
MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
- allocator *ma = task->cntxt->curprg->def->ma;
- assert(ma);
MT_sema_down(&task->sema);
while (task->top[task->cur] >= 0) {
@@ -1115,7 +1121,7 @@ SQLworker(void *arg)
for (j = piece * task->id;
j < task->top[task->cur] && j < piece *
(task->id + 1); j++)
if (task->rows[task->cur][j]) {
- if (SQLload_parse_row(ma, task, j) < 0)
{
+ if (SQLload_parse_row(task, j) < 0) {
task->errorcnt++;
// early break unless best
effort
if (!task->besteffort) {
@@ -1137,7 +1143,7 @@ SQLworker(void *arg)
for (i = 0; i < task->as->nr_attrs; i++)
if (task->cols[i]) {
t0 = GDKusec();
- if (SQLworker_column(ma, task,
task->cols[i] - 1) < 0)
+ if (SQLworker_column(task,
task->cols[i] - 1) < 0)
break;
t0 = GDKusec() - t0;
task->time[i] += t0;
@@ -1277,8 +1283,10 @@ SQLproducer(void *p)
return;
}
+ allocator *ta = MT_thread_getallocator();
+ allocator_state ta_state = ma_open(ta);
MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
- rdfa = mkdfa(ma, (const unsigned char *) rsep, rseplen);
+ rdfa = mkdfa(ta, (const unsigned char *) rsep, rseplen);
if (rdfa == NULL) {
tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate
memory",
"");
@@ -1317,7 +1325,7 @@ SQLproducer(void *p)
goto reportlackofinput;
}
- if (GDKerrbuf[0]) {
+ if (unlikely(GDKerrbuf[0])) {
tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
"SQLload_file");
/* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
@@ -1475,6 +1483,7 @@ SQLproducer(void *p)
if (cnt == task->maxrow) {
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
}
} else {
@@ -1488,6 +1497,7 @@ SQLproducer(void *p)
if (task->state == ENDOFCOPY) {
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
}
}
@@ -1511,6 +1521,7 @@ SQLproducer(void *p)
/* TRC_DEBUG(MAL_SERVER, "Producer delivered
all\n");*/
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
}
}
@@ -1521,12 +1532,14 @@ SQLproducer(void *p)
/* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
}
/* consumers ask us to stop? */
if (task->state == ENDOFCOPY) {
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
}
bufcnt[cur] = cnt;
@@ -1543,6 +1556,7 @@ SQLproducer(void *p)
}
//GDKfree(rdfa);
MT_thread_set_qry_ctx(NULL);
+ ma_close(ta, &ta_state);
return;
@@ -1590,6 +1604,8 @@ SQLload_file(Client cntxt, Tablet *as, b
lng tio, t1 = 0;
char name[MT_NAME_LEN];
allocator *ma = cntxt->curprg->def->ma;
+ allocator *ta = MT_thread_getallocator();
+ allocator_state ta_state = ma_open(ta);
if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
@@ -1613,7 +1629,7 @@ SQLload_file(Client cntxt, Tablet *as, b
/* create the reject tables */
create_rejects_table(task.cntxt);
if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
- || task.cntxt->error_msg == NULL || task.cntxt->error_input ==
NULL) {
+ || task.cntxt->error_msg == NULL || task.cntxt->error_input ==
NULL) {
tablet_error(&task, lng_nil, lng_nil, int_nil,
"SQLload initialization failed", "");
/* nothing allocated yet, so nothing to free */
@@ -1623,9 +1639,9 @@ SQLload_file(Client cntxt, Tablet *as, b
assert(rsep);
assert(csep);
assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
- task.fields = (char ***) ma_zalloc(ma, as->nr_attrs * sizeof(char **));
- task.cols = (int *) ma_zalloc(ma, as->nr_attrs * sizeof(int));
- task.time = (lng *) ma_zalloc(ma, as->nr_attrs * sizeof(lng));
+ task.fields = (char ***) ma_zalloc(ta, as->nr_attrs * sizeof(char **));
+ task.cols = (int *) ma_zalloc(ta, as->nr_attrs * sizeof(int));
+ task.time = (lng *) ma_zalloc(ta, as->nr_attrs * sizeof(lng));
if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
tablet_error(&task, lng_nil, lng_nil, int_nil,
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]