Changeset: 2c84d2fd9ba6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2c84d2fd9ba6
Modified Files:
gdk/Makefile.ag
gdk/gdk.h
gdk/gdk_heap.c
gdk/shared_memory.c
gdk/shared_memory.h
monetdb5/extras/pyapi/connection.c
monetdb5/extras/pyapi/lazyarray.c
monetdb5/extras/pyapi/pyapi.c
monetdb5/extras/pyapi/pytypes.h
Branch: pyapi
Log Message:
Reworked data transfer between separate processes; now always uses mmaped files
instead of shared memory, uses GDKmmap to create the files and correctly
deletes the files.
diffs (truncated from 1169 to 300 lines):
diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -31,7 +31,7 @@ lib_gdk = {
gdk_unique.c \
gdk_firstn.c \
bat.feps bat1.feps bat2.feps \
- libbat.rc shared_memory.h shared_memory.c
+ libbat.rc shared_memory.c shared_memory.h
LIBS = ../common/options/libmoptions \
../common/stream/libstream \
../common/utils/libmutils \
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -641,13 +641,13 @@ typedef enum { GDK_FAIL, GDK_SUCCEED } g
/* Heap storage modes */
typedef enum {
- STORE_MEM = 0, /* load into GDKmalloced memory */
- STORE_MMAP = 1, /* mmap() into virtual memory */
- STORE_PRIV = 2, /* BAT copy of copy-on-write mmap */
- STORE_CMEM = 3, /* Indicates the value is stored in regular C
memory rather than GDK memory.*/
- STORE_NOWN = 4, /* Indicates that the bat does not own the chunk of
memory and is not in charge of freeing it.*/
- STORE_SHARED = 5, /* Indicattes that the bat uses shared memory. */
- STORE_INVALID /* invalid value, used to indicate error */
+ STORE_MEM = 0, /* load into GDKmalloced memory */
+ STORE_MMAP = 1, /* mmap() into virtual memory */
+ STORE_PRIV = 2, /* BAT copy of copy-on-write mmap */
+ STORE_CMEM = 3, /* Indicates the value is stored in regular C
memory rather than GDK memory.*/
+ STORE_NOWN = 4, /* Indicates that the bat does not own the
chunk of memory and is not in charge of freeing it.*/
+ STORE_MMAPABS = 5, /* mmap() into virtual memory from an absolute
path (not part of dbfarm) */
+ STORE_INVALID /* invalid value, used to indicate error */
} storage_t;
typedef struct {
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -47,7 +47,6 @@
#include "monetdb_config.h"
#include "gdk.h"
#include "gdk_private.h"
-#include "shared_memory.h"
static void *
HEAPcreatefile(int farmid, size_t *maxsz, const char *fn)
@@ -566,16 +565,9 @@ HEAPfree(Heap *h, int remove)
h->size, PTRFMTCAST h->base);
GDKfree(h->base);
} else if (h->storage == STORE_CMEM) {
- //heap is stored in regular C memory rather than GDK
memory
+ //heap is stored in regular C memory rather than GDK
memory,so we call free()
free(h->base);
- }
-#ifdef HAVE_FORK
- else if (h->storage == STORE_SHARED)
- {
- release_shared_memory(h->base);
- }
-#endif
- else { /* mapped file, or STORE_PRIV */
+ } else { /* mapped file, or STORE_PRIV */
gdk_return ret = GDKmunmap(h->base, h->size);
if (ret != GDK_SUCCEED) {
@@ -589,6 +581,16 @@ HEAPfree(Heap *h, int remove)
h->size, (int) ret);
}
}
+#ifdef HAVE_FORK
+ if (h->storage == STORE_MMAPABS) {
+ // heap is stored in a mmap() file, but h->filename points to
the absolute path
+ if (h->filename && unlink(h->filename) < 0 && errno != ENOENT) {
+ perror(h->filename);
+ }
+ GDKfree(h->filename);
+ h->filename = NULL;
+ }
+#endif
h->base = NULL;
if (h->filename) {
if (remove) {
diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -1,12 +1,11 @@
#include "shared_memory.h"
-#ifndef _WIN32
+#ifdef HAVE_FORK
#include "gdk.h"
#include "gdk_private.h"
#include "../monetdb5/mal/mal_exception.h"
-#include "mutils.h"
#include <stdlib.h>
#include <assert.h>
@@ -25,307 +24,88 @@
#include <sys/sem.h>
#include <time.h>
-static lng *shm_memory_ids;
-static void **shm_ptrs;
-static int shm_unique_id = 1;
-static int shm_current_id = 0;
-static int shm_max_id = 32;
-static int shm_is_initialized = false;
-static MT_Lock release_memory_lock;
+static size_t shm_unique_id = 1;
static key_t base_key = 800000000;
-int memtype = SHM_SHARED;
-
-str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool
reg, lng *return_shmid);
-void store_shared_memory(lng memory_id, void *ptr);
-str release_shared_memory_id(int memory_id, void *ptr);
-
-str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool
reg, lng *return_shmid);
-str release_mmap_memory(void *ptr, size_t size);
-
+str ftok_enhanced(int id, key_t *return_key);
str init_process_semaphore(int id, int count, int flags, int *semid);
-str initialize_shared_memory(void)
-{
- if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as
well
- return createException(MAL, "shared_memory.init", "Attempting to
initialize shared memory when it was already initialized.");
+size_t get_unique_id(size_t offset) {
+ size_t id;
- //initialize the pointer to memory ID structure
- shm_ptrs = malloc(shm_max_id * sizeof(void*));
- shm_memory_ids = malloc(shm_max_id * sizeof(lng));
- shm_current_id = 0;
- shm_max_id = 32;
- shm_unique_id = 2;
+ id = shm_unique_id;
+ shm_unique_id += offset;
+ return id;
+}
- MT_lock_init(&release_memory_lock, "release_memory_lock");
-
- shm_is_initialized = true;
+str snprintf_mmap_file(str file, size_t max, size_t id) {
+ snprintf(file, max, "pymmap%zu", id);
return MAL_SUCCEED;
}
-void store_shared_memory(lng memory_id, void *ptr)
-{
- int i;
-
- assert(shm_is_initialized);
-
-
- for(i = 0; i < shm_current_id; i++)
- {
- if (shm_ptrs[i] == NULL)
- {
- shm_memory_ids[i] = memory_id;
- shm_ptrs[i] = ptr;
- return;
- }
- }
-
- if (shm_current_id >= shm_max_id)
- {
- void **new_ptrs = malloc(shm_max_id * 2 * sizeof(void*));
- lng *new_memory_ids = malloc(shm_max_id * 2 * sizeof(lng));
-
- memcpy(new_ptrs, shm_ptrs, sizeof(void*) * shm_max_id);
- memcpy(new_memory_ids, shm_memory_ids, sizeof(lng) *
shm_max_id);
-
- free(shm_ptrs); free(shm_memory_ids);
- shm_ptrs = new_ptrs; shm_memory_ids = new_memory_ids;
- shm_max_id *= 2;
- }
-
- shm_memory_ids[shm_current_id] = memory_id;
- shm_ptrs[shm_current_id] = ptr;
- shm_current_id++;
-}
-
-int get_unique_shared_memory_id(int offset)
-{
- int id;
- assert(shm_is_initialized);
-
- id = shm_unique_id;
- shm_unique_id += offset;
- return id;
-}
-
-str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool
reg, lng *return_shmid)
-{
+str init_mmap_memory(size_t base_id, size_t id_offset, size_t maxsize, void
***return_ptr, size_t **return_size) {
char address[100];
void *ptr;
- int fd, result;
- // TODO: memmap shouldn't be in tmp directory
- // TODO: we should just use GDKmmap, try to get that to work
- snprintf(address, 100, "/tmp/temp_pyapi_mmap_%d", id);
+ int fd;
+ size_t size = maxsize;
+ int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC |
MAP_SHARED;
+ char *path = NULL;
+ snprintf_mmap_file(address, 100, base_id + id_offset);
- fd = open(address, flags | O_RDWR, MONETDB_MODE);
+ /* round up to multiple of GDK_mmap_pagesize with a
+ * minimum of one
+ size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1);
+ if (size == 0)
+ size = GDK_mmap_pagesize; */
+ fd = GDKfdlocate(NOFARM, address, "wb", "tmp");
if (fd < 0) {
- char *err = strerror(errno);
- errno = 0;
- close(fd);
- return createException(MAL, "shared_memory.get", "Could not create
mmap file %s: %s", address, err);
+ return createException(MAL, "shared_memory.get", "Failure in
GDKfdlocate(NOFARM, %s, \"wb\", NULL)", address);
}
- if (flags != 0) {
- result = lseek(fd, size - 1, SEEK_SET);
- if (result == -1) {
- char *err = strerror(errno);
- errno = 0;
- close(fd);
- return createException(MAL, "shared_memory.get", "Failed to extend
mmap file: %s", err);
- }
- result = write(fd, "", 1);
- if (result != 1) {
- char *err = strerror(errno);
- errno = 0;
- close(fd);
- return createException(MAL, "shared_memory.get", "Failed to write
to mmap file: %s", err);
- }
+ close(fd);
+ path = GDKfilepath(NOFARM, BATDIR, address, "tmp");
+ if (path == NULL) {
+ return createException(MAL, "shared_memory.get", "Failure in
GDKfilepath(NOFARM, "BATDIR",%s,\"tmp\")", address);
}
- ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- close(fd);
- if (ptr == (void*) -1) {
- char *err = strerror(errno);
- errno = 0;
- return createException(MAL, "shared_memory.get", "Failure in
mmap(NULL, %zu, PROT_WRITE, MAP_SHARED, %d, 0): %s", size, fd, err);
+ if (GDKextend(path, size) != GDK_SUCCEED) {
+ return createException(MAL, "shared_memory.get", "Failure in
GDKextend(%s,%zu)", path, size);
}
- if (reg) store_shared_memory(size, ptr);
- if (return_ptr != NULL) *return_ptr = ptr;
- if (return_shmid != NULL) *return_shmid = id;
+ ptr = GDKmmap(path, mod, size);
+ if (ptr == NULL) {
+ return createException(MAL, "shared_memory.get", "Failure in
GDKmmap(%s, %d, %zu)", path, mod, size);
+ }
+ GDKfree(path);
+ if (return_ptr != NULL) (*return_ptr)[id_offset] = ptr;
+ if (return_size != NULL) (*return_size)[id_offset] = size;
return MAL_SUCCEED;
}
-str release_mmap_memory(void *ptr, size_t size)
-{
+str release_mmap_memory(void *ptr, size_t size, size_t id) {
+ char address[100];
+ char *path;
int ret;
- // TODO: Actually delete files on disk
- ret = munmap(ptr, size);
- if (ret != 0) {
- char *err = strerror(errno);
- errno = 0;
- return createException(MAL, "shared_memory.release_mmap_memory",
"Failure in munmap(%p,%zu): %s", ptr, size, err);
+ snprintf_mmap_file(address, 100, id);
+ if (GDKmunmap(ptr, size) != GDK_SUCCEED) {
+ return createException(MAL, "shared_memory.get", "Failure in
GDKmunmap(%p, %zu)", ptr, size);
+ }
+ path = GDKfilepath(NOFARM, BATDIR, address, "tmp");
+ if (path == NULL) {
+ return createException(MAL, "shared_memory.get", "Failure in
GDKfilepath(NOFARM, "BATDIR",%s,\"tmp\")", address);
+ }
+ ret = remove(path);
+ GDKfree(path);
+ if (ret < 0) {
+ perror(strerror(errno));
+ return createException(MAL, "shared_memory.get", "Failure in
remove(%s)", path);
}
return MAL_SUCCEED;
}
-str release_shared_memory_ptr(void *ptr)
-{
- if (shmdt(ptr) == -1)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list