Changeset: 2a26598a4f3f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2a26598a4f3f
Modified Files:
monetdb5/modules/mal/tablet.c
Branch: default
Log Message:
The "task" structure can be allocated on the stack.
diffs (truncated from 479 to 300 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -1639,7 +1639,7 @@ SQLload_file(Client cntxt, Tablet *as, b
int j;
BUN firstcol;
BUN i, attr;
- READERtask *task = (READERtask *) GDKzalloc(sizeof(READERtask));
+ READERtask task;
READERtask ptask[MAXWORKERS];
int threads = (!maxrow || maxrow > (1 << 16)) ? (GDKnr_threads <
MAXWORKERS && GDKnr_threads > 1 ? GDKnr_threads - 1 : MAXWORKERS - 1) : 1;
lng lio = 0, tio, t1 = 0, total = 0, iototal = 0;
@@ -1650,17 +1650,14 @@ SQLload_file(Client cntxt, Tablet *as, b
threads, csep, rsep, quote);
#endif
memset(ptask, 0, sizeof(ptask));
+ memset(&task, 0, sizeof(task));
- if (task == 0) {
- //SQLload file error
- return BUN_NONE;
- }
- task->cntxt = cntxt;
+ task.cntxt = cntxt;
/* create the reject tables */
- create_rejects_table(task->cntxt);
- if (task->cntxt->error_row == NULL || task->cntxt->error_fld == NULL ||
task->cntxt->error_msg == NULL || task->cntxt->error_input == NULL) {
- tablet_error(task, lng_nil, int_nil, "SQLload initialization
failed", "");
+ create_rejects_table(task.cntxt);
+ if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL ||
task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
+ tablet_error(&task, lng_nil, int_nil, "SQLload initialization
failed", "");
goto bailout;
}
@@ -1672,53 +1669,53 @@ SQLload_file(Client cntxt, Tablet *as, b
assert(rsep);
assert(csep);
assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
- task->fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
- task->cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
- task->time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
- task->cur = 0;
+ task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
+ task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
+ task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
+ task.cur = 0;
for (i = 0; i < MAXBUFFERS; i++) {
- task->base[i] = GDKzalloc(MAXROWSIZE(2 * b->size) + 2);
- task->rowlimit[i] = MAXROWSIZE(2 * b->size);
- if (task->base[i] == 0) {
- tablet_error(task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
+ task.base[i] = GDKzalloc(MAXROWSIZE(2 * b->size) + 2);
+ task.rowlimit[i] = MAXROWSIZE(2 * b->size);
+ if (task.base[i] == 0) {
+ tablet_error(&task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
goto bailout;
}
- task->base[i][b->size + 1] = 0;
- task->input[i] = task->base[i] + 1; /* wrap the buffer with
null bytes */
+ task.base[i][b->size + 1] = 0;
+ task.input[i] = task.base[i] + 1; /* wrap the buffer with
null bytes */
}
- task->besteffort = best;
+ task.besteffort = best;
if (maxrow < 0)
- task->maxrow = BUN_MAX;
+ task.maxrow = BUN_MAX;
else
- task->maxrow = (BUN) maxrow;
+ task.maxrow = (BUN) maxrow;
- if (task->fields == 0 || task->cols == 0 || task->time == 0) {
- tablet_error(task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
+ if (task.fields == 0 || task.cols == 0 || task.time == 0) {
+ tablet_error(&task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
goto bailout;
}
- task->as = as;
- task->skip = skip;
- task->quote = quote;
- task->csep = csep;
- task->seplen = strlen(csep);
- task->rsep = rsep;
- task->rseplen = strlen(rsep);
- task->errbuf = cntxt->errbuf;
+ task.as = as;
+ task.skip = skip;
+ task.quote = quote;
+ task.csep = csep;
+ task.seplen = strlen(csep);
+ task.rsep = rsep;
+ task.rseplen = strlen(rsep);
+ task.errbuf = cntxt->errbuf;
- MT_sema_init(&task->producer, 0, "task->producer");
- MT_sema_init(&task->consumer, 0, "task->consumer");
- task->ateof = 0;
- task->b = b;
- task->out = out;
+ MT_sema_init(&task.producer, 0, "task.producer");
+ MT_sema_init(&task.consumer, 0, "task.consumer");
+ task.ateof = 0;
+ task.b = b;
+ task.out = out;
#ifdef MLOCK_TST
- mlock(task->fields, as->nr_attrs * sizeof(char *));
- mlock(task->cols, as->nr_attrs * sizeof(int));
- mlock(task->time, as->nr_attrs * sizeof(lng));
+ mlock(task.fields, as->nr_attrs * sizeof(char *));
+ mlock(task.cols, as->nr_attrs * sizeof(int));
+ mlock(task.time, as->nr_attrs * sizeof(lng));
for (i = 0; i < MAXBUFFERS; i++)
- mlock(task->base[i], b->size + 2);
+ mlock(task.base[i], b->size + 2);
#endif
as->error = NULL;
@@ -1728,49 +1725,49 @@ SQLload_file(Client cntxt, Tablet *as, b
/* allocate enough space for pointers into the buffer pool. */
/* the record separator is considered a column */
- task->limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
+ task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
for (i = 0; i < as->nr_attrs; i++) {
- task->fields[i] = GDKzalloc(sizeof(char *) * task->limit);
- if (task->fields[i] == 0) {
- if (task->as->error == NULL)
+ task.fields[i] = GDKzalloc(sizeof(char *) * task.limit);
+ if (task.fields[i] == 0) {
+ if (task.as->error == NULL)
as->error = createException(MAL,
"sql.copy_from", MAL_MALLOC_FAIL);
goto bailout;
}
#ifdef MLOCK_TST
- mlock(task->fields[i], sizeof(char *) * task->limit);
+ mlock(task.fields[i], sizeof(char *) * task.limit);
#endif
- task->cols[i] = (int) (i + 1); /* to distinguish non
initialized later with zero */
+ task.cols[i] = (int) (i + 1); /* to distinguish non
initialized later with zero */
}
for (i = 0; i < MAXBUFFERS; i++) {
- task->lines[i] = GDKzalloc(sizeof(char *) * task->limit);
- if (task->lines[i] == NULL) {
- tablet_error(task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file:failed to alloc buffers");
+ task.lines[i] = GDKzalloc(sizeof(char *) * task.limit);
+ if (task.lines[i] == NULL) {
+ tablet_error(&task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file:failed to alloc buffers");
goto bailout;
}
}
- task->rowerror = (bte *) GDKzalloc(sizeof(bte) * task->limit);
- if( task->rowerror == NULL){
- tablet_error(task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file:failed to alloc rowerror buffer");
+ task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
+ if( task.rowerror == NULL){
+ tablet_error(&task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file:failed to alloc rowerror buffer");
goto bailout;
}
- MT_create_thread(&task->tid, SQLproducer, (void *) task,
MT_THR_JOINABLE);
+ MT_create_thread(&task.tid, SQLproducer, (void *) &task,
MT_THR_JOINABLE);
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "#parallel bulk load " LLFMT " - " BUNFMT "\n",
- skip, task->maxrow);
+ skip, task.maxrow);
#endif
- task->workers = threads;
+ task.workers = threads;
for (j = 0; j < threads; j++) {
- ptask[j] = *task;
+ ptask[j] = task;
ptask[j].id = j;
ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
if (ptask[j].cols == 0) {
- tablet_error(task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
+ tablet_error(&task, lng_nil, int_nil, MAL_MALLOC_FAIL,
"SQLload_file");
goto bailout;
}
#ifdef MLOCK_TST
- mlock(ptask[j].cols, sizeof(char *) * task->limit);
+ mlock(ptask[j].cols, sizeof(char *) * task.limit);
#endif
MT_sema_init(&ptask[j].sema, 0, "ptask[j].sema");
MT_sema_init(&ptask[j].reply, 0, "ptask[j].reply");
@@ -1781,43 +1778,43 @@ SQLload_file(Client cntxt, Tablet *as, b
tio = GDKusec() - tio;
t1 = GDKusec();
#ifdef MLOCK_TST
- mlock(task->b->buf, task->b->size);
+ mlock(task.b->buf, task.b->size);
#endif
- for (firstcol = 0; firstcol < task->as->nr_attrs; firstcol++)
- if (task->as->format[firstcol].c != NULL)
+ for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
+ if (task.as->format[firstcol].c != NULL)
break;
- while (res == 0 && cnt < task->maxrow) {
+ while (res == 0 && cnt < task.maxrow) {
// track how many elements are in the aggregated BATs
- cntstart = BATcount(task->as->format[firstcol].c);
+ cntstart = BATcount(task.as->format[firstcol].c);
/* block until the producer has data available */
- MT_sema_down(&task->consumer);
- cnt += task->top[task->cur];
- if (task->ateof)
+ MT_sema_down(&task.consumer);
+ cnt += task.top[task.cur];
+ if (task.ateof)
break;
t1 = GDKusec() - t1;
total += t1;
iototal += tio;
#ifdef _DEBUG_TABLET_
- mnstr_printf(GDKout, "#Break %d lines\n", task->top[task->cur]);
+ mnstr_printf(GDKout, "#Break %d lines\n", task.top[task.cur]);
#endif
t1 = GDKusec();
- if (task->top[task->cur]) {
+ if (task.top[task.cur]) {
/* activate the workers to break lines */
for (j = 0; j < threads; j++) {
/* stage one, break the lines in parallel */
ptask[j].error = 0;
ptask[j].state = BREAKLINE;
- ptask[j].next = task->top[task->cur];
- ptask[j].fields = task->fields;
- ptask[j].limit = task->limit;
- ptask[j].cnt = task->cnt;
- ptask[j].cur = task->cur;
- ptask[j].top[task->cur] = task->top[task->cur];
+ ptask[j].next = task.top[task.cur];
+ ptask[j].fields = task.fields;
+ ptask[j].limit = task.limit;
+ ptask[j].cnt = task.cnt;
+ ptask[j].cur = task.cur;
+ ptask[j].top[task.cur] = task.top[task.cur];
MT_sema_up(&ptask[j].sema);
}
}
- if (task->top[task->cur]) {
+ if (task.top[task.cur]) {
/* await completion of line break phase */
for (j = 0; j < threads; j++) {
MT_sema_down(&ptask[j].reply);
@@ -1832,13 +1829,13 @@ SQLload_file(Client cntxt, Tablet *as, b
}
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "#fill the BATs %d " BUNFMT " cap "
BUNFMT "\n",
- task->top[task->cur], task->cnt,
- BATcapacity(as->format[task->cur].c));
+ task.top[task.cur], task.cnt,
+ BATcapacity(as->format[task.cur].c));
#endif
lio += GDKusec() - t1; /* line break done */
- if (task->top[task->cur]) {
+ if (task.top[task.cur]) {
if (res == 0) {
- SQLworkdivider(task, ptask, (int) as->nr_attrs,
threads);
+ SQLworkdivider(&task, ptask, (int)
as->nr_attrs, threads);
/* activate the workers to update the BATs */
for (j = 0; j < threads; j++) {
@@ -1852,7 +1849,7 @@ SQLload_file(Client cntxt, Tablet *as, b
tio = t1 - tio;
/* await completion of the BAT updates */
- if (res == 0 && task->top[task->cur]) {
+ if (res == 0 && task.top[task.cur]) {
for (j = 0; j < threads; j++) {
MT_sema_down(&ptask[j].reply);
if (ptask[j].errorcnt > 0 &&
!ptask[j].besteffort) {
@@ -1866,22 +1863,22 @@ SQLload_file(Client cntxt, Tablet *as, b
#define trimerrors(TYPE)
\
do {
\
TYPE *src, *dst;
\
- leftover= BATcount(task->as->format[attr].c);
\
+ leftover= BATcount(task.as->format[attr].c);
\
limit = leftover - cntstart;
\
- dst =src= (TYPE *)
BUNtloc(task->as->format[attr].ci,cntstart); \
+ dst =src= (TYPE *)
BUNtloc(task.as->format[attr].ci,cntstart); \
for(j = 0; j < (int) limit; j++, src++){
\
- if ( task->rowerror[j]){
\
+ if ( task.rowerror[j]){
\
leftover--;
\
continue;
\
}
\
*dst++ = *src;
\
}
\
- BATsetcount(task->as->format[attr].c, leftover );
\
+ BATsetcount(task.as->format[attr].c, leftover );
\
} while (0)
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "#Trim bbest %d table size " BUNFMT " rows
found so far " BUNFMT "\n",
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list