Changeset: 940bd2cd4328 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=940bd2cd4328
Modified Files:
monetdb5/modules/mal/tablet.c
Branch: resultset
Log Message:
Fixes to COPY INTO code.
- Wait for SQLproducer to exit (potential crashes otherwise);
- Avoid deadlock in debug code;
- Simplify waiting for other buffer.
diffs (155 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -894,16 +894,12 @@ SQLload_parse_line(READERtask *task, int
if (*line == task->quote) {
skip = 1;
#ifdef _DEBUG_TABLET_
- //MT_lock_set(&errorlock, "insert_val");
mnstr_printf(GDKout, "before #1 %s\n", s =
line);
- //MT_lock_unset(&errorlock, "insert_val");
#endif
task->fields[i][idx] = line + 1;
line = tablet_skip_string(line + 1,
task->quote);
#ifdef _DEBUG_TABLET_
- //MT_lock_set(&errorlock, "insert_val");
mnstr_printf(GDKout, "after #1 %s\n", s);
- //MT_lock_unset(&errorlock, "insert_val");
#endif
if (!line) {
str errline = SQLload_error(task,
task->top[task->cur]);
@@ -957,9 +953,7 @@ SQLload_parse_line(READERtask *task, int
for (i = 0; i < as->nr_attrs; i++) {
task->fields[i][idx] = line;
#ifdef _DEBUG_TABLET_
- MT_lock_set(&errorlock, "insert_val");
mnstr_printf(GDKout, "before #2 %s\n", line);
- //MT_lock_unset(&errorlock, "insert_val");
#endif
/* eat away the column separator */
for (; *line; line++)
@@ -972,9 +966,7 @@ SQLload_parse_line(READERtask *task, int
goto endoffield2;
}
#ifdef _DEBUG_TABLET_
- //MT_lock_set(&errorlock, "insert_val");
mnstr_printf(GDKout, "#after #23 %s\n", line);
- MT_lock_unset(&errorlock, "insert_val");
#endif
/* not enough fields */
if (i < as->nr_attrs - 1) {
@@ -1160,7 +1152,7 @@ SQLproducer(void *p)
consoleinput = 1;
goto parseSTDIN;
}
- while (cnt <= task->maxrow) {
+ for (;;) {
ateof[cur] = tablet_read_more(task->b, task->out,
task->b->size) == EOF;
#ifdef _DEBUG_TABLET_CNTRL
if (ateof[cur] == 0)
@@ -1360,7 +1352,7 @@ SQLproducer(void *p)
mnstr_printf(GDKout, "#SQL producer got buffer %d filled with
%d records \n",
cur, task->top[cur]);
#endif
- if (consoleinput && cnt <= task->maxrow) {
+ if (consoleinput) {
task->cur = cur;
task->ateof = ateof[cur];
task->cnt = bufcnt[cur];
@@ -1368,57 +1360,53 @@ SQLproducer(void *p)
MT_sema_up(&task->consumer, "SQLconsumer");
/* then wait until it is done */
MT_sema_down(&task->producer, "SQLproducer");
- } else if (cnt <= task->maxrow) {
- if (blocked[cur] == 0 && blocked[(cur + 1) %
MAXBUFFERS] != 1) {
- blocked[cur] = 1;
- task->cur = cur;
- task->ateof = ateof[cur];
- task->cnt = bufcnt[cur];
-#ifdef _DEBUG_TABLET_CNTRL
- mnstr_printf(GDKout, "#Let consumer start on
buffer %d ateof %d\n",
- cur, ateof[cur]);
-#endif
- MT_sema_up(&task->consumer, "SQLconsumer");
+ if (cnt == task->maxrow) {
+ THRdel(thr);
+ return;
}
-
+ } else {
+ assert(!blocked[cur]);
if (blocked[(cur + 1) % MAXBUFFERS]) {
+ /* first wait until other buffer is done */
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#wait for consumers to
finish buffer %d\n",
(cur + 1) %
MAXBUFFERS);
#endif
MT_sema_down(&task->producer, "SQLproducer");
- if (task->state == ENDOFCOPY || task->ateof) {
+ blocked[(cur + 1) % MAXBUFFERS] = 0;
+ if (task->state == ENDOFCOPY) {
THRdel(thr);
return;
}
- blocked[(cur + 1) % MAXBUFFERS] = 0;
- blocked[cur] = 1;
- task->cur = cur;
- task->ateof = ateof[cur];
- task->cnt = bufcnt[cur];
+ }
+ /* other buffer is done, proceed with current buffer */
+ assert(!blocked[(cur + 1) % MAXBUFFERS]);
+ blocked[cur] = 1;
+ task->cur = cur;
+ task->ateof = ateof[cur];
+ task->cnt = bufcnt[cur];
#ifdef _DEBUG_TABLET_CNTRL
- mnstr_printf(GDKout, "#SQL producer got buffer
%d filled with %d records \n",
- cur, task->top[cur]);
+ mnstr_printf(GDKout, "#SQL producer got buffer %d
filled with %d records \n",
+ cur, task->top[cur]);
#endif
- MT_sema_up(&task->consumer, "SQLconsumer");
- cur = (cur + 1) % MAXBUFFERS;
+ MT_sema_up(&task->consumer, "SQLconsumer");
+
+ cur = (cur + 1) % MAXBUFFERS;
#ifdef _DEBUG_TABLET_CNTRL
- mnstr_printf(GDKout, "#May continue with buffer
%d\n", cur);
+ mnstr_printf(GDKout, "#May continue with buffer %d\n",
cur);
#endif
- } else
- cur = (cur + 1) % MAXBUFFERS;
+ if (cnt == task->maxrow) {
+ MT_sema_down(&task->producer, "SQLproducer");
+#ifdef _DEBUG_TABLET_CNTRL
+ mnstr_printf(GDKout, "#Producer delivered
all\n");
+#endif
+ THRdel(thr);
+ return;
+ }
}
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#Continue producer buffer %d\n", cur);
#endif
- /* we have seen and sent all tuples requested? */
- if (cnt == task->maxrow && blocked[(cur + 1) % MAXBUFFERS] == 0
&& blocked[cur] == 0) {
-#ifdef _DEBUG_TABLET_CNTRL
- mnstr_printf(GDKout, "#Producer delivered all\n");
-#endif
- THRdel(thr);
- return;
- }
/* we ran out of input? */
if (task->ateof) {
#ifdef _DEBUG_TABLET_CNTRL
@@ -1792,8 +1780,8 @@ SQLload_file(Client cntxt, Tablet *as, b
mnstr_printf(GDKout, "#Shut down reader\n");
#endif
MT_sema_up(&task->producer, "SQLload_file");
- MT_join_thread(task->tid);
}
+ MT_join_thread(task->tid);
// await completion of the BAT syncs
for (j = 0; j < threads; j++)
MT_sema_down(&ptask[j].reply, "SQLload_file");
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list