Changeset: 0c62d6045ef9 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0c62d6045ef9
Modified Files:
        monetdb5/modules/mal/tablet.c
Branch: Jan2014
Log Message:

Add easy possibility to disable separate thread to read data in COPY INTO.


diffs (119 lines):

diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -53,6 +53,8 @@
 #include <string.h>
 #include <ctype.h>
 
+#define SQLLOADTHREAD          /* define to get separate reader thread */
+
 static MT_Lock errorlock MT_LOCK_INITIALIZER("errorlock");
 
 static BAT *
@@ -680,8 +682,10 @@ typedef struct {
        lng *time, wtime;                       /* time per col + time per 
thread */
        int rounds;                                     /* how often did we 
divide the work */
        MT_Id tid;
+#ifdef SQLLOADTHREAD
        MT_Sema producer;                       /* reader waits for call */
        MT_Sema consumer;                       /* data available */
+#endif
        int ateof;                                      /* io control */
        bstream *b;
        stream *out;
@@ -1065,6 +1069,7 @@ SQLworkdivider(READERtask *task, READERt
        GDKfree(loc);
 }
 
+#ifdef SQLLOADTHREAD
 /*
  * Reading is handled by a separate task as a preparation for
  * more parallelism
@@ -1088,6 +1093,7 @@ SQLloader(void *p)
                MT_sema_up(&task->consumer, "SQLloader");
        }
 }
+#endif
 
 #define MAXWORKERS     64
 
@@ -1150,8 +1156,10 @@ SQLload_file(Client cntxt, Tablet *as, b
        task->input = task->base + 1;   /* wrap the buffer with null bytes */
        task->base[b->size + 1] = 0;
 
+#ifdef SQLLOADTHREAD
        MT_sema_init(&task->consumer, 0, "task->consumer");
        MT_sema_init(&task->producer, 0, "task->producer");
+#endif
        task->ateof = 0;
        task->b = b;
        task->out = out;
@@ -1207,7 +1215,9 @@ SQLload_file(Client cntxt, Tablet *as, b
                MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], 
MT_THR_JOINABLE);
        }
 
+#ifdef SQLLOADTHREAD
        MT_create_thread(&task->tid, SQLloader, (void *) task, MT_THR_JOINABLE);
+#endif
 #ifdef _DEBUG_TABLET_
        mnstr_printf(GDKout, "parallel bulk load " LLFMT " - " LLFMT "\n", 
skip, maxrow);
 #endif
@@ -1383,8 +1393,10 @@ SQLload_file(Client cntxt, Tablet *as, b
                        }
                }
                /* start feeding new data */
+#ifdef SQLLOADTHREAD
                if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
                        MT_sema_up(&task->producer, "SQLload_file");
+#endif
                t1 = GDKusec() - t1;
                total += t1;
                iototal += tio;
@@ -1438,10 +1450,16 @@ SQLload_file(Client cntxt, Tablet *as, b
                        for (j = 0; j < threads; j++)
                                MT_sema_down(&ptask[j].reply, "SQLload_file");
                }
+#ifndef SQLLOADTHREAD
+               if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
+                       task->ateof = tablet_read_more(task->b, task->out, 
task->b->size - (task->b->len - task->b->pos)) == EOF;
+#endif
                if (task->ateof)
                        break;
+#ifdef SQLLOADTHREAD
                if ((e == NULL || s >= end || e >= end) && cnt < (BUN) maxrow)
                        MT_sema_down(&task->consumer, "SQLload_file");
+#endif
        }
 
        if (task->b->pos < task->b->len && cnt < (BUN) maxrow && task->ateof) {
@@ -1473,7 +1491,9 @@ SQLload_file(Client cntxt, Tablet *as, b
        }
 
        task->ateof = 1;
+#ifdef SQLLOADTHREAD
        MT_sema_up(&task->producer, "SQLload_file");
+#endif
        for (j = 0; j < threads; j++) {
                ptask[j].next = -1;
                MT_sema_up(&ptask[j].sema, "SQLload_file");
@@ -1490,7 +1510,9 @@ SQLload_file(Client cntxt, Tablet *as, b
                MT_sema_destroy(&ptask[j].sema);
                MT_sema_destroy(&ptask[j].reply);
        }
+#ifdef SQLLOADTHREAD
        MT_join_thread(task->tid);
+#endif
 
 #ifdef _DEBUG_TABLET_
        mnstr_printf(GDKout, "Found " BUNFMT " tuples\n", cnt);
@@ -1501,8 +1523,10 @@ SQLload_file(Client cntxt, Tablet *as, b
        GDKfree(task->cols);
        GDKfree(task->time);
        GDKfree(task->base);
+#ifdef SQLLOADTHREAD
        MT_sema_destroy(&task->consumer);
        MT_sema_destroy(&task->producer);
+#endif
        GDKfree(task);
 #ifdef MLOCK_TST
        munlockall();
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to