Changeset: c6f8733a08cf for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c6f8733a08cf
Modified Files:
gdk/shared_memory.c
gdk/shared_memory.h
monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:
Better error reporting for multiprocessing/shared memory.
diffs (truncated from 391 to 300 lines):
diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -3,8 +3,7 @@
#ifndef _WIN32
-#include "monetdb_config.h"
-#include "gdk.h"
+#include "../monetdb5/mal/mal_exception.h"
#include <stdlib.h>
#include <assert.h>
@@ -29,24 +28,30 @@ static int shm_current_id = 0;
static int shm_max_id = 32;
static int shm_is_initialized = false;
static char shm_keystring[] = ".";
+static MT_Lock release_memory_lock;
-void *init_shared_memory(int id, size_t size, int flags);
+str init_shared_memory(int id, size_t size, void **ptr, int flags);
void store_shared_memory(int memory_id, void *ptr);
-int release_shared_memory_id(int memory_id, void *ptr);
+str release_shared_memory_id(int memory_id, void *ptr);
int init_process_semaphore(int id, int count, int flags);
-void initialize_shared_memory(void)
+str initialize_shared_memory(void)
{
- if (shm_is_initialized) return;
-
+ if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as
well
+ return createException(MAL, "shared_memory.init", "Attempting to
initialize shared memory when it was already initialized.");
+
+ //initialize the pointer to memory ID structure
shm_ptrs = malloc(shm_max_id * sizeof(void*));
shm_memory_ids = malloc(shm_max_id * sizeof(int));
shm_current_id = 0;
shm_max_id = 32;
shm_unique_id = 2;
- shm_is_initialized = true;
+ MT_lock_init(&release_memory_lock, "release_memory_lock");
+
+ shm_is_initialized = true;
+ return MAL_SUCCEED;
}
void store_shared_memory(int memory_id, void *ptr)
@@ -95,17 +100,17 @@ int get_unique_shared_memory_id(int offs
return id;
}
-void* create_shared_memory(int id, size_t size)
+str create_shared_memory(int id, size_t size, void **return_ptr)
{
- return init_shared_memory(id, size, IPC_CREAT);
+ return init_shared_memory(id, size, return_ptr, IPC_CREAT);
}
-void *get_shared_memory(int id, size_t size)
+str get_shared_memory(int id, size_t size, void **return_ptr)
{
- return init_shared_memory(id, size, 0);
+ return init_shared_memory(id, size, return_ptr, 0);
}
-void *init_shared_memory(int id, size_t size, int flags)
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags)
{
int shmid;
void *ptr;
@@ -113,8 +118,9 @@ void *init_shared_memory(int id, size_t
int key = ftok(shm_keystring, id);
if (key == (key_t) -1)
{
- perror("ftok");
- return NULL;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.get", "Error calling
ftok(keystring:%s,id:%d): %s", shm_keystring, id, err);
}
assert(shm_is_initialized);
@@ -122,8 +128,9 @@ void *init_shared_memory(int id, size_t
shmid = shmget(key, size, flags | 0666);
if (shmid < 0)
{
- perror("shmget");
- return NULL;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.get", "Error calling
shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err);
}
//check if the shared memory segment is already created, if it is we do
not need to add it to the table and can simply return the pointer
@@ -131,28 +138,32 @@ void *init_shared_memory(int id, size_t
{
if (shm_memory_ids[i] == shmid)
{
- return shm_ptrs[i];
+ if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
+ return MAL_SUCCEED;
}
}
ptr = shmat(shmid, NULL, 0);
if (ptr == (void*)-1)
{
- perror("shmat");
- return NULL;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.get", "Error calling
shmat(id:%d,NULL,0): %s", shmid, err);
}
store_shared_memory(shmid, ptr);
- return ptr;
+ if (return_ptr != NULL) *return_ptr = ptr;
+ return MAL_SUCCEED;
}
-int release_shared_memory(void *ptr)
+str release_shared_memory(void *ptr)
{
int i = 0;
int memory_id = -1;
assert(shm_is_initialized);
+ MT_lock_set(&release_memory_lock, "release_memory_lock");
//find the memory_id accompanying the given pointer in the structure
for(i = 0; i < shm_current_id; i++)
{
@@ -164,25 +175,28 @@ int release_shared_memory(void *ptr)
break;
}
}
+ MT_lock_unset(&release_memory_lock, "release_memory_lock");
assert(memory_id);
//actually release the memory at the given ID
return release_shared_memory_id(memory_id, ptr);
}
-int release_shared_memory_id(int memory_id, void *ptr)
+str release_shared_memory_id(int memory_id, void *ptr)
{
if (shmctl(memory_id, IPC_RMID, NULL) == -1)
{
- perror("shmctl");
- return false;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.release", "Error calling
shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err);
}
if (shmdt(ptr) == -1)
{
- perror("shmdt");
- return false;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.release", "Error calling
shmdt(ptr:%p): %s", ptr, err);
}
- return true;
+ return MAL_SUCCEED;
}
int init_process_semaphore(int id, int count, int flags)
diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h
--- a/gdk/shared_memory.h
+++ b/gdk/shared_memory.h
@@ -14,18 +14,21 @@
#ifndef _SHAREDMEMORY_LIB_
#define _SHAREDMEMORY_LIB_
+#include "monetdb_config.h"
+#include "gdk.h"
+
#include <stddef.h>
//! Initialize the shared memory module
-void initialize_shared_memory(void);
+str initialize_shared_memory(void);
//! Not thread safe
-void* create_shared_memory(int id, size_t size);
-//! Not thread safe
-int release_shared_memory(void *ptr);
+str create_shared_memory(int id, size_t size, void **return_ptr);
+//! This is thread safe
+str release_shared_memory(void *ptr);
//! Not thread safe
int get_unique_shared_memory_id(int offset);
//! This is thread safe
-void *get_shared_memory(int id, size_t size);
+str get_shared_memory(int id, size_t size, void **return_ptr);
//! Returns semaphore ID, id = unique id within the program, count = amount of
semaphores
int create_process_semaphore(int id, int count);
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
VERBOSE_MESSAGE("Initializing shared memory.\n");
+ assert(memory_size > 0);
//create the shared memory for the header
MT_lock_set(&pyapiLock, "pyapi.evaluate");
- ptr = create_shared_memory(shm_id, memory_size);
+ msg = create_shared_memory(shm_id, memory_size, (void**) &ptr);
MT_lock_unset(&pyapiLock, "pyapi.evaluate");
- if (ptr == NULL)
+ if (msg != MAL_SUCCEED)
{
- msg = createException(MAL, "pyapi.eval", "Failed to initialize
shared memory");
GDKfree(pids);
process_id = 0;
goto wrapup;
@@ -613,16 +613,17 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
{
//a child failed, get the error message from the child
ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess
* pci->retc + 0]);
+ char *err_ptr;
- char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size);
- if (err_ptr != NULL)
- {
- msg = createException(MAL, "pyapi.eval", "%s", err_ptr);
- release_shared_memory(err_ptr);
- }
- else
- {
- msg = createException(MAL, "pyapi.eval", "Error in child
process, but no exception was thrown.");
+ if (descr->bat_size == 0) {
+ msg = createException(MAL, "pyapi.eval", "Failure in child
process with unknown error.");
+ } else {
+ msg = get_shared_memory(shm_id + 1, descr->bat_size,
(void**) &err_ptr);
+ if (msg == MAL_SUCCEED)
+ {
+ msg = createException(MAL, "pyapi.eval", "%s",
err_ptr);
+ release_shared_memory(err_ptr);
+ }
}
if (process_count > 1) release_process_semaphore(sem_id);
@@ -670,13 +671,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
//get the shared memory address for this return value
VERBOSE_MESSAGE("Parent requesting memory at id %d of size
%d\n", shm_id + (i + 1), total_size);
+ assert(total_size > 0);
MT_lock_set(&pyapiLock, "pyapi.evaluate");
- ret->array_data = get_shared_memory(shm_id + (i + 1),
total_size);
+ msg = get_shared_memory(shm_id + (i + 1), total_size,
&ret->array_data);
MT_lock_unset(&pyapiLock, "pyapi.evaluate");
- if (ret->array_data == NULL)
+ if (msg != MAL_SUCCEED)
{
- msg = createException(MAL, "pyapi.eval", "Shared memory
does not exist.\n");
if (process_count > 1) release_process_semaphore(sem_id);
release_shared_memory(ptr);
GDKfree(pids);
@@ -690,13 +691,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
{
int mask_size = ret->count * sizeof(bool);
+ assert(mask_size > 0);
MT_lock_set(&pyapiLock, "pyapi.evaluate");
- ret->mask_data = get_shared_memory(shm_id + pci->retc + (i
+ 1), mask_size);
+ msg = get_shared_memory(shm_id + pci->retc + (i + 1),
mask_size, (void**) &ret->mask_data);
MT_lock_unset(&pyapiLock, "pyapi.evaluate");
- if (ret->mask_data == NULL)
+ if (msg != MAL_SUCCEED)
{
- msg = createException(MAL, "pyapi.eval", "Shared
memory does not exist.\n");
if (process_count > 1)
release_process_semaphore(sem_id);
release_shared_memory(ptr);
release_shared_memory(ret->array_data);
@@ -869,9 +870,8 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
// First we will fill in the header information, we will need to get a
pointer to the header data first
// The main process has already created the header data for all the
child processes
VERBOSE_MESSAGE("Getting shared memory.\n");
- shm_ptr = get_shared_memory(shm_id, memory_size);
- if (shm_ptr == NULL) {
- msg = createException(MAL, "pyapi.eval", "Failed to allocate
shared memory for header data.\n");
+ msg = get_shared_memory(shm_id, memory_size, (void**) &shm_ptr);
+ if (msg != MAL_SUCCEED) {
goto wrapup;
}
@@ -952,16 +952,18 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
mask_size += descr->bat_count * sizeof(bool);
has_mask = has_mask || descr->has_mask;
}
+ assert(return_size > 0);
// Then we allocate the shared memory for this return value
VERBOSE_MESSAGE("Child creating shared memory at id %d of size
%d\n", shm_id + (i + 1), return_size);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list