Changeset: 2c84d2fd9ba6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2c84d2fd9ba6
Modified Files:
        gdk/Makefile.ag
        gdk/gdk.h
        gdk/gdk_heap.c
        gdk/shared_memory.c
        gdk/shared_memory.h
        monetdb5/extras/pyapi/connection.c
        monetdb5/extras/pyapi/lazyarray.c
        monetdb5/extras/pyapi/pyapi.c
        monetdb5/extras/pyapi/pytypes.h
Branch: pyapi
Log Message:

Reworked data transfer between separate processes; now always uses mmaped files 
instead of shared memory, uses GDKmmap to create the files and correctly 
deletes the files.


diffs (truncated from 1169 to 300 lines):

diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -31,7 +31,7 @@ lib_gdk = {
                gdk_unique.c \
                gdk_firstn.c \
                bat.feps bat1.feps bat2.feps \
-               libbat.rc shared_memory.h shared_memory.c
+               libbat.rc shared_memory.c shared_memory.h
        LIBS = ../common/options/libmoptions \
                ../common/stream/libstream \
                ../common/utils/libmutils \
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -641,13 +641,13 @@ typedef enum { GDK_FAIL, GDK_SUCCEED } g
 
 /* Heap storage modes */
 typedef enum {
-       STORE_MEM = 0,          /* load into GDKmalloced memory */
-       STORE_MMAP = 1,         /* mmap() into virtual memory */
-       STORE_PRIV = 2,         /* BAT copy of copy-on-write mmap */
-       STORE_CMEM = 3,     /* Indicates the value is stored in regular C 
memory rather than GDK memory.*/
-       STORE_NOWN = 4,     /* Indicates that the bat does not own the chunk of 
memory and is not in charge of freeing it.*/
-       STORE_SHARED = 5,   /* Indicattes that the bat uses shared memory. */
-       STORE_INVALID           /* invalid value, used to indicate error */
+       STORE_MEM     = 0,              /* load into GDKmalloced memory */
+       STORE_MMAP    = 1,              /* mmap() into virtual memory */
+       STORE_PRIV    = 2,              /* BAT copy of copy-on-write mmap */
+       STORE_CMEM    = 3,      /* Indicates the value is stored in regular C 
memory rather than GDK memory.*/
+       STORE_NOWN    = 4,      /* Indicates that the bat does not own the 
chunk of memory and is not in charge of freeing it.*/
+    STORE_MMAPABS = 5,      /* mmap() into virtual memory from an absolute 
path (not part of dbfarm) */
+       STORE_INVALID               /* invalid value, used to indicate error */
 } storage_t;
 
 typedef struct {
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -47,7 +47,6 @@
 #include "monetdb_config.h"
 #include "gdk.h"
 #include "gdk_private.h"
-#include "shared_memory.h"
 
 static void *
 HEAPcreatefile(int farmid, size_t *maxsz, const char *fn)
@@ -566,16 +565,9 @@ HEAPfree(Heap *h, int remove)
                                          h->size, PTRFMTCAST h->base);
                        GDKfree(h->base);
                } else if (h->storage == STORE_CMEM) {
-                       //heap is stored in regular C memory rather than GDK 
memory
+                       //heap is stored in regular C memory rather than GDK 
memory,so we call free()
                        free(h->base);
-               }
-#ifdef HAVE_FORK 
-               else if (h->storage == STORE_SHARED)
-               {
-                       release_shared_memory(h->base);
-               }
-#endif
-               else {  /* mapped file, or STORE_PRIV */
+               } else {        /* mapped file, or STORE_PRIV */
                        gdk_return ret = GDKmunmap(h->base, h->size);
 
                        if (ret != GDK_SUCCEED) {
@@ -589,6 +581,16 @@ HEAPfree(Heap *h, int remove)
                                          h->size, (int) ret);
                }
        }
+#ifdef HAVE_FORK
+       if (h->storage == STORE_MMAPABS)  { 
+               // heap is stored in a mmap() file, but h->filename points to 
the absolute path
+               if (h->filename && unlink(h->filename) < 0 && errno != ENOENT) {
+                       perror(h->filename);
+               }
+               GDKfree(h->filename);
+               h->filename = NULL;
+       }
+#endif
        h->base = NULL;
        if (h->filename) {
                if (remove) {
diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -1,12 +1,11 @@
 
 #include "shared_memory.h"
 
-#ifndef _WIN32
+#ifdef HAVE_FORK
 
 #include "gdk.h"
 #include "gdk_private.h"
 #include "../monetdb5/mal/mal_exception.h"
-#include "mutils.h"
 
 #include <stdlib.h>
 #include <assert.h>
@@ -25,307 +24,88 @@
 #include <sys/sem.h>
 #include <time.h>
 
-static lng *shm_memory_ids;
-static void **shm_ptrs;
-static int shm_unique_id = 1;
-static int shm_current_id = 0;
-static int shm_max_id = 32;
-static int shm_is_initialized = false;
-static MT_Lock release_memory_lock;
+static size_t shm_unique_id = 1;
 static key_t base_key = 800000000;
 
-int memtype = SHM_SHARED;
-
-str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid);
-void store_shared_memory(lng memory_id, void *ptr);
-str release_shared_memory_id(int memory_id, void *ptr);
-
-str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid);
-str release_mmap_memory(void *ptr, size_t size);
-
+str ftok_enhanced(int id, key_t *return_key);
 str init_process_semaphore(int id, int count, int flags, int *semid);
 
-str initialize_shared_memory(void)
-{
-       if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as 
well
-        return createException(MAL, "shared_memory.init", "Attempting to 
initialize shared memory when it was already initialized.");
+size_t get_unique_id(size_t offset) {
+    size_t id;
 
-    //initialize the pointer to memory ID structure
-       shm_ptrs = malloc(shm_max_id * sizeof(void*));
-       shm_memory_ids = malloc(shm_max_id * sizeof(lng));
-       shm_current_id = 0;
-       shm_max_id = 32;
-       shm_unique_id = 2;
+    id = shm_unique_id;
+    shm_unique_id += offset;
+    return id;
+}
 
-    MT_lock_init(&release_memory_lock, "release_memory_lock");
-
-    shm_is_initialized = true;
+str snprintf_mmap_file(str file, size_t max, size_t id) {
+    snprintf(file, max, "pymmap%zu", id);
     return MAL_SUCCEED;
 }
 
-void store_shared_memory(lng memory_id, void *ptr)
-{
-    int i;
-
-    assert(shm_is_initialized);
-
-
-    for(i = 0; i < shm_current_id; i++)
-    {
-        if (shm_ptrs[i] == NULL)
-        {
-            shm_memory_ids[i] = memory_id;
-            shm_ptrs[i] = ptr;
-            return;
-        }
-    }
-
-       if (shm_current_id >= shm_max_id)
-       {
-               void **new_ptrs = malloc(shm_max_id * 2 * sizeof(void*));
-               lng *new_memory_ids = malloc(shm_max_id * 2 * sizeof(lng));
-
-               memcpy(new_ptrs, shm_ptrs, sizeof(void*) * shm_max_id);
-               memcpy(new_memory_ids, shm_memory_ids, sizeof(lng) * 
shm_max_id);
-
-               free(shm_ptrs); free(shm_memory_ids);
-               shm_ptrs = new_ptrs; shm_memory_ids = new_memory_ids;
-               shm_max_id *= 2;
-       }
-
-       shm_memory_ids[shm_current_id] = memory_id;
-       shm_ptrs[shm_current_id] = ptr;
-       shm_current_id++;
-}
-
-int get_unique_shared_memory_id(int offset)
-{
-    int id;
-    assert(shm_is_initialized);
-
-       id = shm_unique_id;
-       shm_unique_id += offset;
-       return id;
-}
-
-str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid)
-{   
+str init_mmap_memory(size_t base_id, size_t id_offset, size_t maxsize, void 
***return_ptr, size_t **return_size) {
     char address[100];
     void *ptr;
-    int fd, result;
-    // TODO: memmap shouldn't be in tmp directory
-    // TODO: we should just use GDKmmap, try to get that to work
-    snprintf(address, 100, "/tmp/temp_pyapi_mmap_%d", id);
+    int fd;
+    size_t size = maxsize;
+    int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC  | 
MAP_SHARED;
+    char *path = NULL;
+    snprintf_mmap_file(address, 100, base_id + id_offset);
 
-    fd = open(address, flags | O_RDWR, MONETDB_MODE);
+    /* round up to multiple of GDK_mmap_pagesize with a
+     * minimum of one 
+    size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1);
+    if (size == 0)
+        size = GDK_mmap_pagesize; */
+    fd = GDKfdlocate(NOFARM, address, "wb", "tmp");
     if (fd < 0) {
-        char *err = strerror(errno);
-        errno = 0;
-        close(fd);
-        return createException(MAL, "shared_memory.get", "Could not create 
mmap file %s: %s", address, err);
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKfdlocate(NOFARM, %s, \"wb\", NULL)", address);
     }
-    if (flags != 0) {
-        result = lseek(fd, size - 1, SEEK_SET);
-        if (result == -1) {
-            char *err = strerror(errno);
-            errno = 0;
-            close(fd);
-            return createException(MAL, "shared_memory.get", "Failed to extend 
mmap file: %s", err);
-        }
-        result = write(fd, "", 1);
-        if (result != 1) {
-            char *err = strerror(errno);
-            errno = 0;
-            close(fd);
-            return createException(MAL, "shared_memory.get", "Failed to write 
to mmap file: %s", err);
-        }
+    close(fd);
+    path = GDKfilepath(NOFARM, BATDIR, address, "tmp");
+    if (path == NULL) {
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKfilepath(NOFARM, "BATDIR",%s,\"tmp\")", address);
     }
-    ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-    close(fd);
-    if (ptr == (void*) -1) {
-        char *err = strerror(errno);
-        errno = 0;
-        return createException(MAL, "shared_memory.get", "Failure in 
mmap(NULL, %zu, PROT_WRITE, MAP_SHARED, %d, 0): %s", size, fd, err);
+    if (GDKextend(path, size) != GDK_SUCCEED) {
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKextend(%s,%zu)", path, size);
     }
-    if (reg) store_shared_memory(size, ptr);
-    if (return_ptr != NULL) *return_ptr = ptr;
-    if (return_shmid != NULL) *return_shmid = id;
+    ptr = GDKmmap(path, mod, size);
+    if (ptr == NULL) {
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKmmap(%s, %d, %zu)", path, mod, size);
+    }
+    GDKfree(path);
+    if (return_ptr != NULL) (*return_ptr)[id_offset] = ptr;
+    if (return_size != NULL) (*return_size)[id_offset] = size;
     return MAL_SUCCEED;
 }
 
-str release_mmap_memory(void *ptr, size_t size)
-{
+str release_mmap_memory(void *ptr, size_t size, size_t id) {
+    char address[100];
+    char *path;
     int ret;
-    // TODO: Actually delete files on disk
-    ret = munmap(ptr, size);
-    if (ret != 0) {
-        char *err = strerror(errno);
-        errno = 0;
-        return createException(MAL, "shared_memory.release_mmap_memory", 
"Failure in munmap(%p,%zu): %s", ptr, size, err);
+    snprintf_mmap_file(address, 100, id);
+    if (GDKmunmap(ptr, size) != GDK_SUCCEED) {
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKmunmap(%p, %zu)", ptr, size);
+    }
+    path = GDKfilepath(NOFARM, BATDIR, address, "tmp");
+    if (path == NULL) {
+        return createException(MAL, "shared_memory.get", "Failure in 
GDKfilepath(NOFARM, "BATDIR",%s,\"tmp\")", address);
+    }
+    ret = remove(path);
+    GDKfree(path);
+    if (ret < 0) {
+        perror(strerror(errno));
+        return createException(MAL, "shared_memory.get", "Failure in 
remove(%s)", path);
     }
     return MAL_SUCCEED;
 }
 
-str release_shared_memory_ptr(void *ptr)
-{
-    if (shmdt(ptr) == -1)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to