Changeset: 0c62d6045ef9 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0c62d6045ef9
Modified Files:
monetdb5/modules/mal/tablet.c
Branch: Jan2014
Log Message:
Add easy possibility to disable separate thread to read data in COPY INTO.
diffs (119 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -53,6 +53,8 @@
#include <string.h>
#include <ctype.h>
+#define SQLLOADTHREAD /* define to get separate reader thread */
+
static MT_Lock errorlock MT_LOCK_INITIALIZER("errorlock");
static BAT *
@@ -680,8 +682,10 @@ typedef struct {
lng *time, wtime; /* time per col + time per
thread */
int rounds; /* how often did we
divide the work */
MT_Id tid;
+#ifdef SQLLOADTHREAD
MT_Sema producer; /* reader waits for call */
MT_Sema consumer; /* data available */
+#endif
int ateof; /* io control */
bstream *b;
stream *out;
@@ -1065,6 +1069,7 @@ SQLworkdivider(READERtask *task, READERt
GDKfree(loc);
}
+#ifdef SQLLOADTHREAD
/*
* Reading is handled by a separate task as a preparation for
* more parallelism
@@ -1088,6 +1093,7 @@ SQLloader(void *p)
MT_sema_up(&task->consumer, "SQLloader");
}
}
+#endif
#define MAXWORKERS 64
@@ -1150,8 +1156,10 @@ SQLload_file(Client cntxt, Tablet *as, b
task->input = task->base + 1; /* wrap the buffer with null bytes */
task->base[b->size + 1] = 0;
+#ifdef SQLLOADTHREAD
MT_sema_init(&task->consumer, 0, "task->consumer");
MT_sema_init(&task->producer, 0, "task->producer");
+#endif
task->ateof = 0;
task->b = b;
task->out = out;
@@ -1207,7 +1215,9 @@ SQLload_file(Client cntxt, Tablet *as, b
MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j],
MT_THR_JOINABLE);
}
+#ifdef SQLLOADTHREAD
MT_create_thread(&task->tid, SQLloader, (void *) task, MT_THR_JOINABLE);
+#endif
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "parallel bulk load " LLFMT " - " LLFMT "\n",
skip, maxrow);
#endif
@@ -1383,8 +1393,10 @@ SQLload_file(Client cntxt, Tablet *as, b
}
}
/* start feeding new data */
+#ifdef SQLLOADTHREAD
if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
MT_sema_up(&task->producer, "SQLload_file");
+#endif
t1 = GDKusec() - t1;
total += t1;
iototal += tio;
@@ -1438,10 +1450,16 @@ SQLload_file(Client cntxt, Tablet *as, b
for (j = 0; j < threads; j++)
MT_sema_down(&ptask[j].reply, "SQLload_file");
}
+#ifndef SQLLOADTHREAD
+ if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
+ task->ateof = tablet_read_more(task->b, task->out,
task->b->size - (task->b->len - task->b->pos)) == EOF;
+#endif
if (task->ateof)
break;
+#ifdef SQLLOADTHREAD
if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
MT_sema_down(&task->consumer, "SQLload_file");
+#endif
}
if (task->b->pos < task->b->len && cnt < (BUN) maxrow && task->ateof) {
@@ -1473,7 +1491,9 @@ SQLload_file(Client cntxt, Tablet *as, b
}
task->ateof = 1;
+#ifdef SQLLOADTHREAD
MT_sema_up(&task->producer, "SQLload_file");
+#endif
for (j = 0; j < threads; j++) {
ptask[j].next = -1;
MT_sema_up(&ptask[j].sema, "SQLload_file");
@@ -1490,7 +1510,9 @@ SQLload_file(Client cntxt, Tablet *as, b
MT_sema_destroy(&ptask[j].sema);
MT_sema_destroy(&ptask[j].reply);
}
+#ifdef SQLLOADTHREAD
MT_join_thread(task->tid);
+#endif
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "Found " BUNFMT " tuples\n", cnt);
@@ -1501,8 +1523,10 @@ SQLload_file(Client cntxt, Tablet *as, b
GDKfree(task->cols);
GDKfree(task->time);
GDKfree(task->base);
+#ifdef SQLLOADTHREAD
MT_sema_destroy(&task->consumer);
MT_sema_destroy(&task->producer);
+#endif
GDKfree(task);
#ifdef MLOCK_TST
munlockall();
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list