Changeset: ae8d77654a0e for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae8d77654a0e
Modified Files:
        gdk/shared_memory.c
        gdk/shared_memory.h
        monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:

Cleanup a lot of multiprocessing code, as we are never creating more than one 
process per PyAPI call anymore.


diffs (truncated from 640 to 300 lines):

diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -30,7 +30,6 @@ static int shm_unique_id = 1;
 static int shm_current_id = 0;
 static int shm_max_id = 32;
 static int shm_is_initialized = false;
-static char shm_keystring[] = BINDIR;
 static MT_Lock release_memory_lock;
 static key_t base_key = 800000000;
 
@@ -42,8 +41,6 @@ str release_shared_memory_id(int memory_
 str init_mmap_memory(int id, size_t size, void **ptr, int flags);
 str release_mmap_memory(void *ptr, size_t size);
 
-int init_process_semaphore(int id, int count, int flags);
-
 str initialize_shared_memory(void)
 {
        if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as 
well
@@ -275,68 +272,6 @@ str release_shared_memory_id(int memory_
        }
        return MAL_SUCCEED;
 }
-
-int init_process_semaphore(int id, int count, int flags)
-{
-    int key = ftok(shm_keystring, id);
-    int semid = -1;
-    if (key == (key_t) -1)
-    {
-        perror("ftok");
-        return semid;
-    }
-    semid = semget(key, count, flags | 0666);
-    if (semid < 0)
-    {
-        perror("semget failed: ");
-    }
-    return semid;
-}
-
-int create_process_semaphore(int id, int count)
-{
-    return init_process_semaphore(id, count, IPC_CREAT);
-}
-
-int get_process_semaphore(int sem_id, int count)
-{
-    return init_process_semaphore(sem_id, count, 0);
-}
-
-int get_semaphore_value(int sem_id, int number)
-{
-    int semval = semctl(sem_id, number, GETVAL, 0);
-    if (semval < 0)
-    {
-        perror("semctl failed: ");
-    }
-    return semval;
-}
-
-int change_semaphore_value(int sem_id, int number, int change)
-{
-    struct sembuf buffer;
-    buffer.sem_num = number;
-    buffer.sem_op = change;
-    buffer.sem_flg = 0;
-
-    if (semop(sem_id, &buffer, 1) < 0)
-    {
-        perror("semop failed: ");
-        return false;
-    }
-    return true;
-}
-
-int release_process_semaphore(int sem_id)
-{
-    if (semctl(sem_id, 0, IPC_RMID) < 0)
-    {
-        perror("semctl failed: ");
-        return false;
-    }
-    return true;
-}
 #else
 //Windows -> Not yet implemented
 #include <stdio.h>
@@ -388,40 +323,4 @@ str get_shared_memory(int id, size_t siz
     NOTIMPLEMENTED();
     return NULL;
 }
-
-
-int create_process_semaphore(int id, int count)
-{
-    (void) id; (void) count;
-    NOTIMPLEMENTED();
-    return -1;
-}
-
-int get_process_semaphore(int id, int count)
-{
-    (void) id; (void) count;
-    NOTIMPLEMENTED();
-    return -1;
-}
-
-int get_semaphore_value(int sem_id, int number)
-{
-    (void) sem_id; (void) number;
-    NOTIMPLEMENTED();
-    return -1;
-}
-
-int change_semaphore_value(int sem_id, int number, int change)
-{
-    (void) sem_id; (void) number; (void) change;
-    NOTIMPLEMENTED();
-    return false;
-}
-
-int release_process_semaphore(int sem_id)
-{
-    (void) sem_id;
-    NOTIMPLEMENTED();
-    return false;
-}
 #endif
diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h
--- a/gdk/shared_memory.h
+++ b/gdk/shared_memory.h
@@ -30,17 +30,4 @@ int get_unique_shared_memory_id(int offs
 //! This is thread safe
 str get_shared_memory(int id, size_t size, void **return_ptr);
 
-//! Returns semaphore ID, id = unique id within the program, count = amount of 
semaphores
-int create_process_semaphore(int id, int count);
-//! Returns semaphore ID
-int get_process_semaphore(int id, int count);
-//! Returns value of semaphore <number> at semaphore id <sem_id>
-int get_semaphore_value(int sem_id, int number);
-//! Change the semaphore <number> at semaphore id <sem_id> value by <change> 
(change = 1 means +1, not set the value to 1)
-int change_semaphore_value(int sem_id, int number, int change);
-//! Release semaphore at sem_id
-int release_process_semaphore(int sem_id);
-
-
-
 #endif /* _SHAREDMEMORY_LIB_ */
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -409,12 +409,9 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     PyInput *pyinput_values = NULL;
     int seqbase = 0;
 #ifndef WIN32
-    bool single_fork = mapped == 1;
     int shm_id = -1;
-    int sem_id = -1;
-    int process_id = 0;
     size_t memory_size = 0;
-    int process_count = 0;
+    bool child_process = false;
 #endif
 #ifdef _PYAPI_TESTING_
     time_storage start_time, end_time;
@@ -422,7 +419,6 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     unsigned long long peak_memory_usage = 0;
 #endif
     PyGILState_STATE gstate = PyGILState_LOCKED;
-    int j;
     bit varres = sqlfun ? sqlfun->varres : 0;
     int retcols = !varres ? pci->retc : -1;
     size_t iu;
@@ -538,16 +534,8 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
         msg = createException(MAL, "pyapi.eval", "Please visit 
http://www.linux.com/directory/Distributions to download a Linux distro.\n");
         goto wrapup;
 #else
-        lng *pids = NULL;
+        lng pid;
         char *ptr = NULL;
-        if (single_fork)
-        {
-            process_count = 1;
-        }
-        else
-        {
-            process_count = 8;
-        }
 
         //create initial shared memory
         MT_lock_set(&pyapiLock, "pyapi.evaluate");
@@ -556,9 +544,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 
         VERBOSE_MESSAGE("Creating multiple processes.\n");
 
-        pids = GDKzalloc(sizeof(lng) * process_count);
-
-        memory_size = pci->retc * process_count * sizeof(ReturnBatDescr); 
//the memory size for the header files, each process has one per return value
+        memory_size = pci->retc * sizeof(ReturnBatDescr); //the memory size 
for the header files, each process has one per return value
 
         VERBOSE_MESSAGE("Initializing shared memory.\n");
 
@@ -569,80 +555,47 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
         MT_lock_unset(&pyapiLock, "pyapi.evaluate");
         if (msg != MAL_SUCCEED)
         {
-            GDKfree(pids);
-            process_id = 0;
             goto wrapup;
         }
 
-        if (process_count > 1)
-        {
-            //initialize cross-process semaphore, we use two semaphores
-            //the semaphores are used as follows:
-            //we set the first semaphore to process_count, and the second 
semaphore to 0
-            //every process first passes the first semaphore (decreasing the 
value), then tries to pass the second semaphore (which will block, because it 
is set to 0)
-            //when the final process passes the first semaphore, it checks the 
value of the first semaphore (which is then equal to 0)
-            //the final process will then set the value of the second 
semaphore to process_count, allowing all processes to pass
-
-            //this means processes will only start returning values once all 
the processes are finished, this is done because we want to have one big shared 
memory block for each return value
-            //and we can only create that block when we know how many return 
values there are, which we only know when all the processes have returned
-
-            sem_id = create_process_semaphore(shm_id, 2);
-            change_semaphore_value(sem_id, 0, process_count);
-        }
-
         VERBOSE_MESSAGE("Waiting to fork.\n");
         //fork
         MT_lock_set(&pyapiLock, "pyapi.evaluate");
         VERBOSE_MESSAGE("Start forking.\n");
-        for(i = 0; i < process_count; i++)
+        if ((pid = fork()) < 0)
         {
-            if ((pids[i] = fork()) < 0)
-            {
-                msg = createException(MAL, "pyapi.eval", "Failed to fork 
process");
-                MT_lock_unset(&pyapiLock, "pyapi.evaluate");
+            msg = createException(MAL, "pyapi.eval", "Failed to fork process");
+            MT_lock_unset(&pyapiLock, "pyapi.evaluate");
 
-                if (process_count > 1) release_process_semaphore(sem_id);
-                release_shared_memory(ptr);
-                GDKfree(pids);
+            release_shared_memory(ptr);
 
-                process_id = 0;
-                goto wrapup;
-            }
-            else if (pids[i] == 0)
-            {
+            goto wrapup;
+        }
+        else if (pid == 0)
+        {
+            child_process = true;
 #ifdef _PYAPI_TESTING_
-                if (!disable_testing && !option_disablemalloctracking && 
benchmark_output != NULL) { init_hook(); }
+            if (!disable_testing && !option_disablemalloctracking && 
benchmark_output != NULL) { init_hook(); }
 #endif
-                break;
-            }
         }
 
-        process_id = i + 1;
-        if (i == process_count)
+        if (!child_process)
         {
             //main process
-            int failedprocess = 0;
-            int current_process = process_count;
+            int status;
             bool success = true;
 
             //wait for child processes
             MT_lock_unset(&pyapiLock, "pyapi.evaluate");
-            while(current_process > 0)
-            {
-                int status;
-                waitpid(pids[current_process - 1], &status, 0);
-                if (status != 0)
-                {
-                    failedprocess = current_process - 1;
-                    success = false;
-                }
-                current_process--;
-            }
+
+            waitpid(pid, &status, 0);
+            if (status != 0)
+                success = false;
 
             if (!success)
             {
                 //a child failed, get the error message from the child
-                ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess 
* pci->retc + 0]);
+                ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[0]);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to