Changeset: 08d8961ace58 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=08d8961ace58
Modified Files:
gdk/shared_memory.c
gdk/shared_memory.h
monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
monetdb5/extras/pyapi/pyapi.c
monetdb5/extras/pyapi/pyapi.h
monetdb5/extras/pyapi/pyapi.mal
Branch: pyapi
Log Message:
Merge.
diffs (truncated from 475 to 300 lines):
diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -3,8 +3,7 @@
#ifndef _WIN32
-#include "monetdb_config.h"
-#include "gdk.h"
+#include "../monetdb5/mal/mal_exception.h"
#include <stdlib.h>
#include <assert.h>
@@ -28,25 +27,33 @@ static int shm_unique_id = 1;
static int shm_current_id = 0;
static int shm_max_id = 32;
static int shm_is_initialized = false;
-static char shm_keystring[] = ".";
+static char shm_keystring[] = BINDIR;
+static MT_Lock release_memory_lock;
+static key_t base_key = 800000000;
-void *init_shared_memory(int id, size_t size, int flags);
+
+str init_shared_memory(int id, size_t size, void **ptr, int flags);
void store_shared_memory(int memory_id, void *ptr);
-int release_shared_memory_id(int memory_id, void *ptr);
+str release_shared_memory_id(int memory_id, void *ptr);
int init_process_semaphore(int id, int count, int flags);
-void initialize_shared_memory(void)
+str initialize_shared_memory(void)
{
- if (shm_is_initialized) return;
-
+ if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as
well
+ return createException(MAL, "shared_memory.init", "Attempting to
initialize shared memory when it was already initialized.");
+
+ //initialize the pointer to memory ID structure
shm_ptrs = malloc(shm_max_id * sizeof(void*));
shm_memory_ids = malloc(shm_max_id * sizeof(int));
shm_current_id = 0;
shm_max_id = 32;
shm_unique_id = 2;
- shm_is_initialized = true;
+ MT_lock_init(&release_memory_lock, "release_memory_lock");
+
+ shm_is_initialized = true;
+ return MAL_SUCCEED;
}
void store_shared_memory(int memory_id, void *ptr)
@@ -79,7 +86,6 @@ void store_shared_memory(int memory_id,
shm_max_id *= 2;
}
-
shm_memory_ids[shm_current_id] = memory_id;
shm_ptrs[shm_current_id] = ptr;
shm_current_id++;
@@ -95,26 +101,34 @@ int get_unique_shared_memory_id(int offs
return id;
}
-void* create_shared_memory(int id, size_t size)
+str create_shared_memory(int id, size_t size, void **return_ptr)
{
- return init_shared_memory(id, size, IPC_CREAT);
+ return init_shared_memory(id, size, return_ptr, IPC_CREAT);
}
-void *get_shared_memory(int id, size_t size)
+str get_shared_memory(int id, size_t size, void **return_ptr)
{
- return init_shared_memory(id, size, 0);
+ return init_shared_memory(id, size, return_ptr, 0);
}
-void *init_shared_memory(int id, size_t size, int flags)
+str ftok_enhanced(int id, key_t *return_key);
+str ftok_enhanced(int id, key_t *return_key)
+{
+ *return_key = base_key + id;
+ return MAL_SUCCEED;
+}
+
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags)
{
int shmid;
void *ptr;
int i;
- int key = ftok(shm_keystring, id);
- if (key == (key_t) -1)
+ key_t key;
+
+ str msg = ftok_enhanced(id, &key);
+ if (msg != MAL_SUCCEED)
{
- perror("ftok");
- return NULL;
+ return msg;
}
assert(shm_is_initialized);
@@ -122,8 +136,9 @@ void *init_shared_memory(int id, size_t
shmid = shmget(key, size, flags | 0666);
if (shmid < 0)
{
- perror("shmget");
- return NULL;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.get", "Error calling
shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err);
}
//check if the shared memory segment is already created, if it is we do
not need to add it to the table and can simply return the pointer
@@ -131,28 +146,32 @@ void *init_shared_memory(int id, size_t
{
if (shm_memory_ids[i] == shmid)
{
- return shm_ptrs[i];
+ if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
+ return MAL_SUCCEED;
}
}
ptr = shmat(shmid, NULL, 0);
if (ptr == (void*)-1)
{
- perror("shmat");
- return NULL;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.get", "Error calling
shmat(id:%d,NULL,0): %s", shmid, err);
}
store_shared_memory(shmid, ptr);
- return ptr;
+ if (return_ptr != NULL) *return_ptr = ptr;
+ return MAL_SUCCEED;
}
-int release_shared_memory(void *ptr)
+str release_shared_memory(void *ptr)
{
int i = 0;
int memory_id = -1;
assert(shm_is_initialized);
+ MT_lock_set(&release_memory_lock, "release_memory_lock");
//find the memory_id accompanying the given pointer in the structure
for(i = 0; i < shm_current_id; i++)
{
@@ -164,25 +183,28 @@ int release_shared_memory(void *ptr)
break;
}
}
+ MT_lock_unset(&release_memory_lock, "release_memory_lock");
assert(memory_id);
//actually release the memory at the given ID
return release_shared_memory_id(memory_id, ptr);
}
-int release_shared_memory_id(int memory_id, void *ptr)
+str release_shared_memory_id(int memory_id, void *ptr)
{
if (shmctl(memory_id, IPC_RMID, NULL) == -1)
{
- perror("shmctl");
- return false;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.release", "Error calling
shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err);
}
if (shmdt(ptr) == -1)
{
- perror("shmdt");
- return false;
+ char *err = strerror(errno);
+ errno = 0;
+ return createException(MAL, "shared_memory.release", "Error calling
shmdt(ptr:%p): %s", ptr, err);
}
- return true;
+ return MAL_SUCCEED;
}
int init_process_semaphore(int id, int count, int flags)
diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h
--- a/gdk/shared_memory.h
+++ b/gdk/shared_memory.h
@@ -14,18 +14,21 @@
#ifndef _SHAREDMEMORY_LIB_
#define _SHAREDMEMORY_LIB_
+#include "monetdb_config.h"
+#include "gdk.h"
+
#include <stddef.h>
//! Initialize the shared memory module
-void initialize_shared_memory(void);
+str initialize_shared_memory(void);
//! Not thread safe
-void* create_shared_memory(int id, size_t size);
-//! Not thread safe
-int release_shared_memory(void *ptr);
+str create_shared_memory(int id, size_t size, void **return_ptr);
+//! This is thread safe
+str release_shared_memory(void *ptr);
//! Not thread safe
int get_unique_shared_memory_id(int offset);
//! This is thread safe
-void *get_shared_memory(int id, size_t size);
+str get_shared_memory(int id, size_t size, void **return_ptr);
//! Returns semaphore ID, id = unique id within the program, count = amount of
semaphores
int create_process_semaphore(int id, int count);
diff --git a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
--- a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
+++ b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
@@ -11,7 +11,7 @@ export MSERVERTEST='netstat -ant | grep
# Testing parameters
# Input test (zero copy vs copy)
# The input sizes to test (in MB)
-export INPUT_TESTING_SIZES="0.1 1 10 100 1000"
+export INPUT_TESTING_SIZES="10"
# Amount of tests to run for each size
export INPUT_TESTING_NTESTS=10
@@ -130,6 +130,11 @@ function pyapi_build() {
fi
}
+function pyapi_run_single_test_echo() {
+ echo \$PYAPI_BUILD_DIR/bin/mserver5 --set mapi_port=\$MSERVER_PORT --set
embedded_py=true --set enable_pyverbose=true --set
pyapi_benchmark_output=\$PYAPI_OUTPUT_DIR/temp_output.tsv $2
+ echo python \$PYAPI_TESTFILE $3 $4 $5 \$MSERVER_PORT $6
+}
+
function pyapi_run_single_test() {
echo "Beginning Test $1"
if [ $SETSID -eq 1 ]; then
@@ -155,6 +160,7 @@ function pyapi_run_single_test() {
return 1
}
+
function pyapi_test_input() {
echo "Beginning Input Testing (Copy vs Zero Copy)"
pyapi_run_single_test "Input Testing (Zero Copy)" "" "INPUT"
input_zerocopy "$INPUT_TESTING_NTESTS" "$INPUT_TESTING_SIZES"
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
VERBOSE_MESSAGE("Initializing shared memory.\n");
+ assert(memory_size > 0);
//create the shared memory for the header
MT_lock_set(&pyapiLock, "pyapi.evaluate");
- ptr = create_shared_memory(shm_id, memory_size);
+ msg = create_shared_memory(shm_id, memory_size, (void**) &ptr);
MT_lock_unset(&pyapiLock, "pyapi.evaluate");
- if (ptr == NULL)
+ if (msg != MAL_SUCCEED)
{
- msg = createException(MAL, "pyapi.eval", "Failed to initialize
shared memory");
GDKfree(pids);
process_id = 0;
goto wrapup;
@@ -613,16 +613,19 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
{
//a child failed, get the error message from the child
ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess
* pci->retc + 0]);
+ char *err_ptr;
- char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size);
- if (err_ptr != NULL)
- {
- msg = createException(MAL, "pyapi.eval", "%s", err_ptr);
- release_shared_memory(err_ptr);
- }
- else
- {
- msg = createException(MAL, "pyapi.eval", "Error in child
process, but no exception was thrown.");
+ if (descr->bat_size == 0) {
+ msg = createException(MAL, "pyapi.eval", "Failure in child
process with unknown error.");
+ } else {
+ MT_lock_set(&pyapiLock, "pyapi.evaluate");
+ msg = get_shared_memory(shm_id + 1, descr->bat_size,
(void**) &err_ptr);
+ MT_lock_unset(&pyapiLock, "pyapi.evaluate");
+ if (msg == MAL_SUCCEED)
+ {
+ msg = createException(MAL, "pyapi.eval", "%s",
err_ptr);
+ release_shared_memory(err_ptr);
+ }
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list