Changeset: ff62644620e6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ff62644620e6
Modified Files:
sql/backends/monet5/miniseed/79_registrar.sql
sql/backends/monet5/miniseed/registrar.c
sql/backends/monet5/miniseed/registrar.mal
Branch: DVframework
Log Message:
registrar became multi-threaded (optional).
diffs (truncated from 450 to 300 lines):
diff --git a/sql/backends/monet5/miniseed/79_registrar.sql
b/sql/backends/monet5/miniseed/79_registrar.sql
--- a/sql/backends/monet5/miniseed/79_registrar.sql
+++ b/sql/backends/monet5/miniseed/79_registrar.sql
@@ -17,7 +17,7 @@ Copyright August 2008-2012 MonetDB B.V.
All Rights Reserved.
*/
-CREATE PROCEDURE register_repo(repo string, mode int)
+CREATE PROCEDURE register_repo(repo string, mode int, num_threads int)
external name registrar.register_repo;
diff --git a/sql/backends/monet5/miniseed/registrar.c
b/sql/backends/monet5/miniseed/registrar.c
--- a/sql/backends/monet5/miniseed/registrar.c
+++ b/sql/backends/monet5/miniseed/registrar.c
@@ -9,6 +9,10 @@
#include "sql_mvc.h"
#include "sql.h"
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+
/*
* keeps BAT and other properties of columns of a table.
*/
@@ -29,6 +33,16 @@ typedef struct {
sht num_tables;
} temp_container;
+typedef struct {
+ int tid; /* thread id */
+ str *file_paths; /* array of file paths to loop on in the thread */
+ long loop_start;
+ long loop_end;
+ int mode; /* carries to the thread */
+ Client cntxt; /* carries to the thread */
+ int *function_created;
+} thread_argv;
+
lng get_line_num(str filename);
lng get_file_paths(str repo_path, str** ret_file_paths);
str mseed_create_temp_container(temp_container* ret_tc);
@@ -37,9 +51,13 @@ str mseed_register(str file_path, temp_c
str mseed_register_and_mount(str file_path, temp_container* ret_tc);
int concatenate_strs(str* words_to_concat, int num_words_to_concat, str*
ret_concatenated);
str prepare_insertion(Client cntxt, temp_container* tc);
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc);
+str insert_into_vault(Client cntxt, temp_container* tc);
str SQLstatementIntern(Client c, str *expr, str nme, int execute, bit output);
str register_clean_up(temp_container* tc);
+void *register_files(void *args);
+
+pthread_mutex_t create_lock;
+pthread_mutex_t insert_lock;
/*
* returns number of lines in a file.
@@ -462,14 +480,13 @@ str prepare_insertion(Client cntxt, temp
*
* returns error or MAL_SUCCEED.
*/
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc)
+str insert_into_vault(Client cntxt, temp_container* tc)
{
/* form a sql query str like this: */
/* INSERT INTO mseed.files SELECT * FROM mseed_files_reg(ticket, table_idx); */
int t;
long ticket = (long) tc;
- mvc *m = NULL;
str msg;
for(t = 0; t < tc->num_tables; t++)
@@ -484,16 +501,6 @@ str insert_into_vault(Client cntxt, MalB
}
- if((msg = getSQLContext(cntxt, mb, &m, NULL))!= MAL_SUCCEED)
- {/* getting mvc failed, what to do */
- return msg;
- }
-
- if(mvc_commit(m, 0, NULL) < 0)
- {/* committing failed */
- throw(MAL,"registrar.insert_into_vault", "committing failed\n");
- }
-
return MAL_SUCCEED;
}
@@ -547,6 +554,7 @@ str mseed_register(str file_path, temp_c
{
MSRecord *msr = NULL;
+ MSFileParam *msfp = NULL;
int retcode;
short int verbose = 1;
BAT *aBAT = NULL;
@@ -557,7 +565,8 @@ str mseed_register(str file_path, temp_c
str ch = (str) GDKmalloc(2*sizeof(char));
ch[1] = '\0';
- while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0,
verbose)) == MS_NOERROR)
+ /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0,
verbose)) == MS_NOERROR) */
+ while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL,
1, 0, verbose)) == MS_NOERROR)
{
if(!files_done)
{
@@ -647,8 +656,11 @@ str mseed_register(str file_path, temp_c
seq_no_fake++;
}
+ GDKfree(ch);
+
/* Cleanup memory and close file */
- ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+ /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+ ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
if ( retcode != MS_ENDOFFILE )
throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path,
ms_errorstr(retcode));
@@ -671,6 +683,7 @@ str mseed_register_and_mount(str file_pa
{
MSRecord *msr = NULL;
+ MSFileParam *msfp = NULL;
int retcode;
short int verbose = 1;
short int data_flag = 1;
@@ -684,7 +697,8 @@ str mseed_register_and_mount(str file_pa
str ch = (str) GDKmalloc(2*sizeof(char));
ch[1] = '\0';
- while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1,
data_flag, verbose)) == MS_NOERROR)
+ /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1,
data_flag, verbose)) == MS_NOERROR) */
+ while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL,
1, data_flag, verbose)) == MS_NOERROR)
{
if(!files_done)
{
@@ -814,7 +828,8 @@ str mseed_register_and_mount(str file_pa
}
/* Cleanup memory and close file */
- ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+ /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+ ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
if ( retcode != MS_ENDOFFILE )
throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path,
ms_errorstr(retcode));
@@ -822,6 +837,90 @@ str mseed_register_and_mount(str file_pa
return MAL_SUCCEED;
}
+void *register_files(void *args)
+{
+ temp_container *tc;
+ long i;
+ str err = NULL;
+ long start, finish;
+
+ thread_argv targv = *((thread_argv*)args);
+
+ /* create temp_container */
+ tc = (temp_container*)GDKmalloc(sizeof(temp_container));
+ assert(tc != NULL);
+ if(targv.mode == 0)
+ err = mseed_create_temp_container(tc); /* depending on design
can get different argument(s) */
+ else
+ err = mseed_create_temp_container_with_data_tables(tc); /*
depending on design can get different argument(s) */
+ if(err != MAL_SUCCEED)
+ {/* temp_container creation failed, what to do */
+ throw(MAL,"registrar.register_repo", "temp_container creation
failed in thread %d: %s\n", targv.tid, err);
+ }
+
+ start = GDKms();
+ /* loop through the file_paths in repo */
+ if(targv.mode == 0)
+ {
+ for(i = targv.loop_start; i < targv.loop_end; i++)
+ {
+ err = mseed_register(targv.file_paths[i], tc);
+ if(err != MAL_SUCCEED)
+ {/* current file cannot be registered, what to do */
+ /*throw(MAL,"registrar.register_repo", "Current
file cannot be registered: %s\n", err); */
+ printf("registrar.register_repo: current file
cannot be registered in thread %d: %s\n", targv.tid, err);
+ }
+ }
+ }
+ else
+ {
+ for(i = targv.loop_start; i < targv.loop_end; i++)
+ {
+ err = mseed_register_and_mount(targv.file_paths[i], tc);
+ if(err != MAL_SUCCEED)
+ {/* current file cannot be registered, what to do */
+ /* throw(MAL,"registrar.register_repo",
"Current file cannot be registered: %s\n", err); */
+ printf("registrar.register_repo: current file
cannot be registered and/or mounted in thread %d: %s\n", targv.tid, err);
+ }
+ }
+ }
+ finish = GDKms();
+ printf("# In thread %d, time for extraction and transformation of
(meta-)data: %ld milliseconds\n", targv.tid, finish - start);
+
+ pthread_mutex_lock(&create_lock);
+ if(*targv.function_created == 0)
+ {
+ /* prepare sql functions for inserting temp_container into
tables_to_be_filled */
+ err = prepare_insertion(targv.cntxt, tc);
+ if(err != MAL_SUCCEED)
+ {/* preparing the insertion failed, what to do */
+ throw(MAL,"registrar.register_repo", "Insertion prepare
failed in thread %d: %s\n", targv.tid, err);
+ }
+ *targv.function_created = 1;
+ }
+ pthread_mutex_unlock(&create_lock);
+
+ pthread_mutex_lock(&insert_lock);
+ start = GDKms();
+ /* insert temp_container into tables_to_be_filled */
+ err = insert_into_vault(targv.cntxt, tc);
+ if(err != MAL_SUCCEED)
+ {/* inserting the temp_container into one of the tables failed, what to
do */
+ throw(MAL,"registrar.register_repo", "Inserting the
temp_container into one of the tables failed in thread %d: %s\n", targv.tid,
err);
+ }
+ finish = GDKms();
+ printf("# In thread %d, time for loading of (meta-)data: %ld
milliseconds\n", targv.tid, finish - start);
+ pthread_mutex_unlock(&insert_lock);
+
+ err = register_clean_up(tc);
+ if(err != MAL_SUCCEED)
+ {/* inserting the temp_container into one of the tables failed, what to
do */
+ throw(MAL,"registrar.register_repo", "Cleaning up the temp_container
failed in thread %d: %s\n", targv.tid, err);
+ }
+
+ return NULL;
+}
+
/*
* takes a repository path repo_path, finds out the files in it, creates a
* temp_container of the metadata to be inserted, for each file calls the
@@ -837,12 +936,15 @@ str register_repo(Client cntxt, MalBlkPt
{
str *repo_path = (str*) getArgReference(stk,pci,pci->retc); /* arg 1:
repo_path */
int mode = *(int*) getArgReference(stk,pci,pci->retc+1); /* arg 2: mode
0:register only, mode 1: register+mount */
+ int num_threads = *(int*) getArgReference(stk,pci,pci->retc+2); /* arg
3: 1: no threads, >1: multi-threaded */
str *file_paths = NULL;
long num_file_paths;
temp_container *tc;
long i;
str err = NULL;
long start, finish;
+ int function_created = 0;
+ mvc *m = NULL;
/* fetch file_paths from repo_path */
num_file_paths = get_file_paths(*repo_path, &file_paths);
@@ -851,70 +953,129 @@ str register_repo(Client cntxt, MalBlkPt
throw(MAL,"registrar.register_repo", "Problematic repository:
%s\n", err);
}
- /* create temp_container */
- tc = (temp_container*)GDKmalloc(sizeof(temp_container));
- assert(tc != NULL);
- if(mode == 0)
- err = mseed_create_temp_container(tc); /* depending on design
can get different argument(s) */
- else
- err = mseed_create_temp_container_with_data_tables(tc); /*
depending on design can get different argument(s) */
- if(err != MAL_SUCCEED)
- {/* temp_container creation failed, what to do */
- throw(MAL,"registrar.register_repo", "temp_container creation
failed: %s\n", err);
- }
-
- start = GDKms();
- /* loop through the file_paths in repo */
- if(mode == 0)
+ if(num_threads > 1)
{
- for(i = 0; i < num_file_paths; i++)
+
+ /* multi-threaded */
+ long loop_start = 0;
+ long num_file_paths_per_thread = num_file_paths / num_threads;
+ pthread_t *threads =
(pthread_t*)GDKmalloc(num_threads*sizeof(pthread_t));
+ thread_argv *targvs =
(thread_argv*)GDKmalloc(num_threads*sizeof(thread_argv));
+ int j;
+
+ if (pthread_mutex_init(&insert_lock, NULL) != 0 ||
pthread_mutex_init(&create_lock, NULL) != 0)
{
- err = mseed_register(file_paths[i], tc);
- if(err != MAL_SUCCEED)
- {/* current file cannot be registered, what to do */
- /* throw(MAL,"registrar.register_repo", "Current
file cannot be registered: %s\n", err); */
- printf("registrar.register_repo: current file
cannot be registered: %s\n", err);
- }
+ throw(MAL,"registrar.register_repo", "mutex init
failed\n");
}
+
+ for(j = 0; j < num_threads; j++)
+ {
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list