Am Montag, den 27.12.2010, 19:13 -0500 schrieb Joe Python:
>
>
> Python has a Parallel version of 'map' where we can offload work to
> os level processes (worker pool) and get the results
>
Find attached two files.
pthreads.scm: a pthread pool where you can offload work to.
sqlite3.scm: uses the pthread module.
sqlite3.scm is actually a showcase too: pthreads running sqlite3 can
call back to chicken (I'm using the vfs interface). (Which in turn is
used by ball.askemos.org to implement byzantine replication of the
SQLite interpreter).
Hope this helps
/Jörg
(declare
(unit pthreads)
(uses srfi-18 srfi-34 srfi-35)
(uses atomic) ; init thread extnsns
(uses library) ; gc
(fixnum-arithmetic)
(disable-interrupts)
(usual-integrations)
(no-bound-checks)
(no-procedure-checks-for-usual-bindings)
(bound-to-procedure
)
(foreign-declare #<<EOF
#ifdef ___CHICKEN
typedef C_word obj;
#define FALSE_OBJ C_SCHEME_FALSE
#else
#include <rscheme/obj.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#ifndef NO_THREAD_LOOP
#define THREAD_POOL_SIZE 5
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
typedef int (*askemos_request_function_t)(void *);
typedef struct _askemos_pool_entry {
askemos_request_function_t function;
void *data;
void *callback;
} askemos_pool_entry_t;
struct askemos_pool {
pthread_mutex_t mutex;
pthread_cond_t has_job;
pthread_cond_t has_space;
unsigned short int total, next, free;
askemos_pool_entry_t *r;
};
static void *
worker_thread_loop(void *arg);
static void
askemos_pool_entry_init(struct askemos_pool * pool, askemos_pool_entry_t * r);
static int
askemos_pool_init(struct askemos_pool * pool)
{
int i;
pool->next = 0;
pool->total = pool->free = 50;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->has_job, NULL);
pthread_cond_init(&pool->has_space, NULL);
pool->r = malloc(sizeof(askemos_pool_entry_t) * pool->total);
for (i = 0; i < pool->total; ++i) {
pool->r[i].function = NULL;
pool->r[i].data = NULL;
pool->r[i].callback = NULL;
}
for (i = 0; i < THREAD_POOL_SIZE; ++i) {
int e;
pthread_t thread;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, /* PTHREAD_CREATE_DETACHED */
PTHREAD_CREATE_JOINABLE);
e = pthread_create(&thread, &attr, worker_thread_loop, pool);
pthread_attr_destroy(&attr);
}
return 0;
}
static int
askemos_pool_send(struct askemos_pool * pool,
askemos_request_function_t function,
void *data,
void *callback)
{
askemos_pool_entry_t *result = NULL;
pthread_mutex_lock(&pool->mutex);
do {
if (pool->free) {
result = &pool->r[pool->next];
pool->next = (pool->next + 1) % pool->total;
--pool->free;
} else {
pthread_mutex_unlock(&pool->mutex);
return 1;
/*
fprintf(stderr, "DANGER: chicken waiting on thread pool space\n");
pthread_cond_wait(&pool->has_space, &pool->mutex);
*/
}
} while( result == NULL );
result->function = function;
result->data = data;
result->callback = callback;
pthread_mutex_unlock(&pool->mutex);
pthread_cond_signal(&pool->has_job);
return 0;
}
/* askemos_pool_put returns an entry into the queue (LI) and returns
* the result to rscheme. The latter is questionable, but we avoid to
* take yet another lock around the interpreter callback.
*/
static void
askemos_pool_receive(struct askemos_pool * pool,
askemos_request_function_t *function,
void **data,
void **callback)
{
askemos_pool_entry_t *result = NULL;
pthread_mutex_lock(&pool->mutex);
do {
if (pool->free != pool->total) {
unsigned short int target = (pool->next + pool->free) % pool->total;
result = &pool->r[target];
++pool->free;
*function = result->function;
*data = result->data;
*callback = result->callback;
} else {
pthread_cond_wait(&pool->has_job, &pool->mutex);
}
} while( result == NULL );
pthread_mutex_unlock(&pool->mutex);
pthread_cond_signal(&pool->has_space);
}
static struct askemos_pool *request_pool = NULL;
int
start_asynchronous_request(askemos_request_function_t function,
void *data, void *callback)
{
if( request_pool == NULL ) {
fprintf(stderr, "thread pool not initialised\n");
exit(1);
}
return askemos_pool_send(request_pool, function, data, callback);
}
static pthread_mutex_t callback_mutex;
#ifdef ___CHICKEN
static int the_interrupt_pipe[2] = {0, 0};
static void *the_callback = NULL;
static void *the_callback_result = NULL;
static C_word the_result = C_SCHEME_FALSE;
static void *integer_result = NULL;
static int C_notify_external_interrupt()
{
static char buf[1] = { (char) 254 };
if(write(the_interrupt_pipe[1], buf, 1)) ;
}
#endif
static void *
worker_thread_loop(void *arg)
{
struct askemos_pool *pool = arg;
askemos_request_function_t function;
void *data;
void *callback;
int result;
// pthread_cleanup_push(worker_thread_unlock, ressources);
while (1) {
askemos_pool_receive(request_pool, &function, &data, &callback);
result = (*function)(data);
pthread_mutex_lock(&callback_mutex);
the_result = result;
the_callback = callback;
the_callback_result = integer_result;
/* CHICKEN_interrupt(1); */
C_notify_external_interrupt();
}
// pthread_cleanup_pop(1);
return NULL;
}
#ifdef ___CHICKEN
void C_interrupt_call(void *callback, void *result, void* value) {
pthread_mutex_lock(&callback_mutex);
the_result = (C_word) value;
the_callback_result = result;
the_callback = callback;
/* CHICKEN_interrupt(1); */
C_notify_external_interrupt();
}
#endif
void
askemos_pre_init(void *intres)
{
pthread_mutex_init(&callback_mutex, NULL);
request_pool = malloc(sizeof(struct askemos_pool));
askemos_pool_init(request_pool);
integer_result=intres;
#ifdef ___CHICKEN
if( pipe(the_interrupt_pipe) == -1 )
fprintf(stderr, "Failed to open interrupt pipe\n");
#endif
}
#else /* NO_THREAD_LOOP */
typedef int (*askemos_request_function_t)(void *);
int
start_asynchronous_request(askemos_request_function_t function,
void *data, void *callback){}
void
askemos_pre_init(void *intres)
{
fprintf(stderr, "thread pool not initialised\n");
}
#endif
/*
int
test_askemos_thread_sleep(void *data)
{
int time = (int) data;
sleep(time);
return time;
}
*/
EOF
)
)
(module
pthreads
(
external-wait
pthread-pool-load
)
(import scheme (except chicken with-exception-handler condition?) foreign
lolevel extras
(except srfi-18 raise with-exception-handler) srfi-34 srfi-35 atomic)
(define pthread-pool-load (make-semaphore 'pthread-pool-load 40))
(define-foreign-variable interrupt-callback c-pointer "the_callback")
(define callback-result (foreign-lambda* int ((c-pointer result)) "return(* (int *) result);"))
(define callback-result-root (make-gc-root callback-result))
(define (make-gc-root obj)
((foreign-lambda*
c-pointer ((scheme-object obj))
"C_GC_ROOT *r=CHICKEN_new_gc_root();"
"CHICKEN_gc_root_set(r, obj);"
"return(r);")
obj))
((foreign-lambda* void ((c-pointer f)) "askemos_pre_init(f);")
callback-result-root)
(define (handle-callback)
(if interrupt-callback
(let ((cb ((foreign-lambda* scheme-object () "return(CHICKEN_gc_root_ref(the_callback));")))
(rc (((foreign-lambda* scheme-object () "return(CHICKEN_gc_root_ref(the_callback_result));"))
((foreign-lambda* c-pointer () "return(&the_result);")))))
(set! interrupt-callback #f)
((foreign-lambda* void ()
;; "CHICKEN_interrupt(0);"
"pthread_mutex_unlock(&callback_mutex);"))
(thread-start! (make-thread (lambda () (cb rc)) 'handle-callback)))
((foreign-lambda*
void ()
"fprintf(stderr, \"Ignored callback -- does this mess up things?\\n\");"))) )
(define external-wait
(thread-start!
(make-thread
(lambda ()
(let ((fd ((foreign-lambda* int () "return(the_interrupt_pipe[0]);"))))
(do ()
(#f)
(thread-wait-for-i/o! fd)
((foreign-lambda*
void ()
"static int buf[1]; int r = read(the_interrupt_pipe[0], buf, 1);"))
(handle-callback))))
"external-wait")))
) ;; module pthreads
;; (C) 2008, 2010 J�rg F. Wittenberger -*-Scheme-*-
(declare
(unit sqlite3)
(uses srfi-1 srfi-18 srfi-69 atomic pthreads)
(fixnum)
(usual-integrations)
(disable-interrupts)
(foreign-declare #<<EOF
typedef C_word obj;
#define FALSE_OBJ C_SCHEME_FALSE
#include "sqlite/sqlite3.h"
#include <assert.h>
static int rs_sqlite3_auth_unrestricted(void* userdata, int opcode,
const char* arg1, const char* arg2,
const char* dbname, const char* trigger)
{
return SQLITE_OK;
}
static void sqlite3_set_authorizer_unrestricted(sqlite3 *cnx)
{
sqlite3_set_authorizer(cnx, rs_sqlite3_auth_unrestricted, NULL);
}
static int rs_sqlite3_auth_restricted(void* userdata, int opcode,
const char* arg1, const char* arg2,
const char* dbname, const char* trigger)
{
switch(opcode) {
case SQLITE_CREATE_INDEX: /* Index Name Table Name */
case SQLITE_REINDEX:
case SQLITE_CREATE_TABLE: /* Table Name NULL */
case SQLITE_ALTER_TABLE: /* Database Name Table Name */
case SQLITE_CREATE_TEMP_INDEX: /* Index Name Table Name */
case SQLITE_CREATE_TEMP_TABLE: /* Table Name NULL */
case SQLITE_CREATE_TEMP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_CREATE_TEMP_VIEW: /* View Name NULL */
case SQLITE_CREATE_TRIGGER: /* Trigger Name Table Name */
case SQLITE_CREATE_VIEW: /* View Name NULL */
case SQLITE_DELETE: /* Table Name NULL */
case SQLITE_DROP_INDEX: /* Index Name Table Name */
case SQLITE_DROP_TABLE: /* Table Name NULL */
case SQLITE_DROP_TEMP_INDEX: /* Index Name Table Name */
case SQLITE_DROP_TEMP_TABLE: /* Table Name NULL */
case SQLITE_DROP_TEMP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_DROP_TEMP_VIEW: /* View Name NULL */
case SQLITE_DROP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_DROP_VIEW: /* View Name NULL */
case SQLITE_INSERT: /* Table Name NULL */
case SQLITE_PRAGMA: /* Pragma Name 1st arg or NULL */
case SQLITE_READ: /* Table Name Column Name */
case SQLITE_SELECT: /* NULL NULL */
#if SQLITE_VERSION_NUMBER > 3003007
case SQLITE_FUNCTION: /* Function Name NULL */
#endif
case SQLITE_TRANSACTION: /* NULL NULL */
case SQLITE_UPDATE: /* Table Name Column Name */
return SQLITE_OK;
case SQLITE_ATTACH: /* Filename NULL */
case SQLITE_DETACH: /* Database Name NULL */
default:
fprintf(stderr, "auth_restricted deny %d\n", opcode);
return SQLITE_DENY;
}
}
static int rs_sqlite3_auth_restricted_ro(void* userdata, int opcode,
const char* arg1, const char* arg2,
const char* dbname, const char* trigger)
{
switch(opcode) {
case SQLITE_CREATE_INDEX: /* Index Name Table Name */
case SQLITE_CREATE_TABLE: /* Table Name NULL */
case SQLITE_ALTER_TABLE: /* Database Name Table Name */
return SQLITE_DENY;
case SQLITE_CREATE_TEMP_INDEX: /* Index Name Table Name */
case SQLITE_CREATE_TEMP_TABLE: /* Table Name NULL */
case SQLITE_CREATE_TEMP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_CREATE_TEMP_VIEW: /* View Name NULL */
return SQLITE_OK;
case SQLITE_CREATE_TRIGGER: /* Trigger Name Table Name */
case SQLITE_CREATE_VIEW: /* View Name NULL */
case SQLITE_DELETE: /* Table Name NULL */
case SQLITE_DROP_INDEX: /* Index Name Table Name */
case SQLITE_DROP_TABLE: /* Table Name NULL */
return SQLITE_DENY;
case SQLITE_DROP_TEMP_INDEX: /* Index Name Table Name */
case SQLITE_DROP_TEMP_TABLE: /* Table Name NULL */
case SQLITE_DROP_TEMP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_DROP_TEMP_VIEW: /* View Name NULL */
return SQLITE_OK;
case SQLITE_DROP_TRIGGER: /* Trigger Name Table Name */
case SQLITE_DROP_VIEW: /* View Name NULL */
case SQLITE_INSERT: /* Table Name NULL */
return SQLITE_DENY;
case SQLITE_PRAGMA: /* Pragma Name 1st arg or NULL */
return SQLITE_DENY;
case SQLITE_READ: /* Table Name Column Name */
case SQLITE_SELECT: /* NULL NULL */
#if SQLITE_VERSION_NUMBER > 3003007
case SQLITE_FUNCTION: /* Function Name NULL */
#endif
return SQLITE_OK;
case SQLITE_TRANSACTION: /* NULL NULL */
case SQLITE_UPDATE: /* Table Name Column Name */
case SQLITE_ATTACH: /* Filename NULL */
case SQLITE_DETACH: /* Database Name NULL */
default:
return SQLITE_DENY;
}
}
static void sqlite3_concat(sqlite3_context* ctx, int argc, sqlite3_value** argv)
{
int len=0, i=0, j=0;
char *r = NULL;
for(;i<argc; ++i) len+=sqlite3_value_bytes(argv[i]);
r = malloc(len+1);
for(i=0, j=0; i<argc; ++i) {
int s = sqlite3_value_bytes(argv[i]);
strncpy(r+j, (const char*)sqlite3_value_text(argv[i]), s);
j += s;
}
r[j]='\0';
sqlite3_result_text(ctx, r, len, free);
}
static int sq_sqlite3_create_functions(sqlite3 *conn)
{
return sqlite3_create_function(conn, "concat", -1, SQLITE_UTF8,
NULL, sqlite3_concat, NULL, NULL);
}
static void sqlite3_set_authorizer_restricted_ro(sqlite3 *cnx)
{
sqlite3_set_authorizer(cnx, rs_sqlite3_auth_restricted_ro, NULL);
}
static void sqlite3_set_authorizer_restricted(sqlite3 *cnx)
{
sqlite3_set_authorizer(cnx, rs_sqlite3_auth_restricted, NULL);
}
/* setup function table */
static void sqlite3_setup_full(sqlite3 *cnx)
{
sq_sqlite3_create_functions(cnx);
}
static void sqlite3_setup_restricted(sqlite3 *cnx)
{
sqlite3_set_authorizer(cnx, rs_sqlite3_auth_restricted, NULL);
sq_sqlite3_create_functions(cnx);
}
static void sqlite3_setup_restricted_ro(sqlite3 *cnx)
{
sqlite3_set_authorizer(cnx, rs_sqlite3_auth_restricted_ro, NULL);
sq_sqlite3_create_functions(cnx);
}
static void (*setup_table[4])(sqlite3 *) = {
NULL,
sqlite3_setup_full,
sqlite3_setup_restricted,
sqlite3_setup_restricted_ro
};
static void sqlite3_setup(sqlite3 *cnx, int i)
{
void (*f)(sqlite3 *);
assert(i<4);
f=setup_table[i];
if(f) (*f)(cnx);
}
static pthread_mutex_t the_shared_memory_mux;
static struct callback_args *the_shared_memory_for_open = NULL;
void lock_callback_open_parameters(struct callback_args * x)
{
pthread_mutex_lock(&the_shared_memory_mux);
the_shared_memory_for_open = x;
}
void unlock_callback_open_parameters()
{
the_shared_memory_for_open = NULL;
pthread_mutex_unlock(&the_shared_memory_mux);
}
typedef int (*askemos_request_function_t)(void *);
extern int
start_asynchronous_request(askemos_request_function_t function,
void *data, void *callback);
extern void
C_interrupt_call(void *callback, void *converter, void *result);
struct callback_args {
pthread_mutex_t mux;
pthread_cond_t cond;
C_GC_ROOT *ref;
int op;
int size;
int amount;
sqlite_int64 offset;
void *buf;
/* char buf[1]; */
};
//#define TRACE 1
struct open_args {
sqlite3 *cnx;
int setup;
struct callback_args *sm;
char *vfs;
char dbn[2];
};
static int
pthread_sqlite3_open(void *data)
{
struct open_args *a = data;
int rc;
#ifdef TRACE
fprintf(stderr, "DB %s vfs %s SM %p\n", a->dbn, a->vfs, a->sm);
#endif
if(a->sm != NULL) {
lock_callback_open_parameters(a->sm);
}
rc = sqlite3_open_v2( a->dbn,
&a->cnx,
( a->setup == 3 ? SQLITE_OPEN_READONLY :
( SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE ) )
| SQLITE_OPEN_NOMUTEX,
a->vfs);
/* unlock_callback_open_parameters(); done within open ASAP */
sqlite3_setup(a->cnx, a->setup);
return SQLITE_OK;
}
struct sqlite3_db {
sqlite3 *cnx;
size_t bufsize;
void *buf;
};
static int pthread_sqlite3_close(void *data)
{
struct sqlite3_db *a = data;
int rc = sqlite3_close(a->cnx);
free(a->buf);
free(a);
return rc;
}
struct prepare_args {
sqlite3_stmt *stmt;
int tail;
sqlite3 *db;
int sql_len;
int offset;
char sql[1];
};
static int pthread_sqlite3_prepare(void *data)
{
struct prepare_args *a = data;
int rc;
const char *tail;
#ifdef TRACE
fprintf(stderr, "prepar %p >>%s<< %d %d\n", a->db, a->sql, a->offset, a->sql_len);
#endif
rc = sqlite3_prepare_v2( a->db,
a->sql + a->offset,
a->sql_len - a->offset,
&a->stmt,
&tail );
if (a->stmt != NULL) {
a->tail = tail - a->sql;
}
return rc;
}
/*
** 2009 Juli 24
**
** This file contains OS interface code that is used with Askemos(R).
**/
/*
** standard include files.
*/
#include <string.h>
#include <pthread.h>
/*
** Maximum pathname length supported by the callback backend.
*/
#define CALLBACK_MAX_PATHNAME 512
#define CB_IO_TYPE_BLKSZ 1
#define CB_IO_TYPE_FSIZE 2
#define CB_IO_TYPE_READ 3
#define CB_IO_TYPE_WRITE 4
#define CB_IO_TYPE_TRUNC 5
#define CB_IO_TYPE_CLOSE 6
/*
** Pointer-Makros
*/
/* pointer to the parent sqlite3_vfs structure (e.g. unix) */
sqlite3_vfs *pVfs = NULL;
// "host" callback
static void *the_callback_callback = NULL;
static void *the_callback_arg_converter = NULL;
typedef struct callback_file callback_file;
struct callback_file {
sqlite3_io_methods *ioMethods;
const char* zName;
/* shared memory
*
* form: #((int opcode) (string buf) (int amount) (int offset) (pointer (condition done))
*/
struct callback_args *sm;
// pointer to the sqlite3_file structure of the parent vfs (e.g. unix vfs)
sqlite3_file pReal[1];
};
/*
** Method declarations for callback_file.
*/
static int callbackClose(sqlite3_file*);
static int callbackRead(sqlite3_file*, void*, int iAmt, sqlite3_int64 iOfst);
static int callbackWrite(sqlite3_file*,const void*,int iAmt, sqlite3_int64 iOfst);
static int callbackTruncate(sqlite3_file*, sqlite3_int64 size);
static int callbackFileSize(sqlite3_file*, sqlite3_int64 *pSize);
static int callbackFileControl(sqlite3_file*, int op, void *pArg);
static int callbackSectorSize(sqlite3_file*);
static int callbackDeviceCharacteristics(sqlite3_file*);
static int callbackCloseNot(sqlite3_file*);
static int callbackReadNot(sqlite3_file*, void*, int iAmt, sqlite3_int64 iOfst);
static int callbackWriteNot(sqlite3_file*,const void*,int iAmt, sqlite3_int64 iOfst);
static int callbackTruncateNot(sqlite3_file*, sqlite3_int64 size);
static int callbackSyncNot(sqlite3_file*, int flags);
static int callbackFileSizeNot(sqlite3_file*, sqlite3_int64 *pSize);
static int callbackSyncNot(sqlite3_file*, int flags);
static int callbackLockNot(sqlite3_file*, int);
static int callbackUnlockNot(sqlite3_file*, int);
static int callbackCheckReservedLockNot(sqlite3_file*, int *pResOut);
static int callbackSectorSizeNot(sqlite3_file*);
/*
** Method declarations for callback_vfs.
*/
static int callbackOpen(sqlite3_vfs*, const char *, sqlite3_file*, int , int *);
static int callbackDelete(sqlite3_vfs*, const char *zName, int syncDir);
static int callbackAccess(sqlite3_vfs*, const char *zName, int flags, int *);
static int callbackFullPathname(sqlite3_vfs*, const char *zName, int, char *zOut);
static void *callbackDlOpen(sqlite3_vfs*, const char *zFilename);
static void callbackDlError(sqlite3_vfs*, int nByte, char *zErrMsg);
static void (*callbackDlSym(sqlite3_vfs *pVfs, void *p, const char*zSym))(void);
static void callbackDlClose(sqlite3_vfs*, void*);
static int callbackRandomness(sqlite3_vfs*, int nByte, char *zOut);
static int callbackSleep(sqlite3_vfs*, int microseconds);
static int callbackCurrentTime(sqlite3_vfs*, double*);
static sqlite3_vfs callback_vfs = {
1, /* iVersion */
sizeof(callback_file), /* szOsFile */
CALLBACK_MAX_PATHNAME, /* mxPathname */
0, /* pNext */
"askemos", /* zName */
0, /* pAppData */
callbackOpen, /* xOpen */
callbackDelete, /* xDelete */
callbackAccess, /* xAccess */
callbackFullPathname, /* xFullPathname */
callbackDlOpen, /* xDlOpen */
callbackDlError, /* xDlError */
callbackDlSym, /* xDlSym */
callbackDlClose, /* xDlClose */
callbackRandomness, /* xRandomness */
callbackSleep, /* xSleep */
callbackCurrentTime /* xCurrentTime */
};
static sqlite3_io_methods callback_io_methods = {
1, /* iVersion */
callbackClose, /* xClose */
callbackRead, /* xRead */
callbackWrite, /* xWrite */
callbackTruncate, /* xTruncate */
callbackSyncNot, /* xSync */
callbackFileSize, /* xFileSize */
callbackLockNot, /* xLock */
callbackUnlockNot, /* xUnlock */
callbackCheckReservedLockNot, /* xCheckReservedLock */
callbackFileControl, /* xFileControl */
callbackSectorSize, /* xSectorSize */
callbackDeviceCharacteristics /* xDeviceCharacteristics */
};
static sqlite3_io_methods callback_io_noop_methods = {
1, /* iVersion */
callbackCloseNot, /* xClose */
callbackReadNot, /* xRead */
callbackWriteNot, /* xWrite */
callbackTruncateNot, /* xTruncate */
callbackSyncNot, /* xSync */
callbackFileSizeNot, /* xFileSize */
callbackLockNot, /* xLock */
callbackUnlockNot, /* xUnlock */
callbackCheckReservedLockNot, /* xCheckReservedLock */
callbackFileControl, /* xFileControl */
callbackSectorSizeNot, /* xSectorSize */
callbackDeviceCharacteristics /* xDeviceCharacteristics */
};
/*
** Open an callback file handle.
*/
static int callbackOpen(
sqlite3_vfs *pCallbackVfs,
const char *zName,
sqlite3_file *pFile,
int flags,
int *pOutFlags
){
callback_file *p = (callback_file *) pFile;
/* Only for main db, to save energy. */
if(flags & SQLITE_OPEN_MAIN_DB) {
#ifdef TRACE
fprintf(stderr, "A open Main %s\n", zName);
#endif
p->ioMethods = &callback_io_methods;
p->zName = zName;
p->sm = the_shared_memory_for_open;
unlock_callback_open_parameters();
return SQLITE_OK;
} else if (flags & SQLITE_OPEN_MAIN_JOURNAL) {
p->ioMethods = &callback_io_noop_methods;
return SQLITE_OK;
}
#ifdef TRACE
fprintf(stderr, "A open Else %s\n", zName);
#endif
return pVfs->xOpen(pVfs, zName, pFile, flags, pOutFlags);
}
/*
* fill callback args
*/
static callback_file *
fill_callback_args(sqlite3_file *sf, short type, void *zBuf, int iAmt,
sqlite_int64 iOfst)
{
callback_file *p = (callback_file *) sf;
struct callback_args *a = p->sm;
a->op = type;
a->amount = iAmt;
a->offset = iOfst;
a->buf = zBuf;
return p;
}
static int call_callback(callback_file *cf)
{
struct callback_args *a = cf->sm;
pthread_mutex_lock(&a->mux);
// calling chicken to add the blocklist to the mailbox (read)
C_interrupt_call(the_callback_callback, the_callback_arg_converter, a);
// waiting for the call to complete
pthread_cond_wait(&a->cond, &a->mux);
pthread_mutex_unlock(&a->mux);
return a->op;
}
/*
** Close an callback-file.
*/
static int callbackCloseNot(sqlite3_file *pFile)
{
return SQLITE_OK;
}
static int callbackClose(sqlite3_file *sf){
#ifdef TRACE
fprintf(stderr, "A close %p\n", sf);
#endif
return call_callback(fill_callback_args(sf, CB_IO_TYPE_CLOSE, NULL, 0, 0));
}
/*
** Read data from an callback-file.
*/
static int callbackReadNot(sqlite3_file *sf, void *zBuf,
int iAmt, sqlite_int64 iOfst)
{
#ifdef TRACE
fprintf(stderr, "read not %p\n", sf);
#endif
return SQLITE_IOERR;
}
static int callbackRead(sqlite3_file *sf, void *zBuf,
int iAmt, sqlite_int64 iOfst)
{
#ifdef TRACE
fprintf(stderr, "cb read %p %p %d %ld\n", sf, zBuf, iAmt, (long int) iOfst);
#endif
return call_callback(fill_callback_args(sf, CB_IO_TYPE_READ, zBuf, iAmt, iOfst));
}
/*
** Write data to an callback-file.
*/
static int callbackWriteNot(sqlite3_file *pFile, const void *zBuf,
int iAmt, sqlite_int64 iOfst) {
return SQLITE_OK;
}
static int callbackWrite(
sqlite3_file *sf, // structure
const void *zBuf, // buffer with data
int iAmt, // amount of bytes to write
sqlite_int64 iOfst // offset of file to write
){
#ifdef TRACE
fprintf(stderr, "cb write %p %p %d %ld\n", sf, zBuf, iAmt, (long int) iOfst);
#endif
return call_callback(fill_callback_args(sf, CB_IO_TYPE_WRITE, (void*) zBuf, iAmt, iOfst));
}
/*
** Truncate an callback-file.
*/
static int callbackTruncateNot(sqlite3_file *pFile, sqlite_int64 size)
{
return SQLITE_OK;
}
static int callbackTruncate(sqlite3_file *sf, sqlite_int64 size)
{
#ifdef TRACE
fprintf(stderr, "Atruncate\n");
#endif
return call_callback(fill_callback_args(sf, CB_IO_TYPE_TRUNC, NULL, 0, size));
}
/*
** Sync an callback-file.
*/
static int callbackSyncNot(sqlite3_file *pFile, int flags)
{
return SQLITE_OK;
}
/*
** Return the current file-size of an callback-file.
*/
static int callbackFileSizeNot(sqlite3_file *pFile, sqlite_int64 *pSize)
{
*pSize = 0;
return SQLITE_OK;
}
static int callbackFileSize(sqlite3_file *sf, sqlite_int64 *pSize)
{
callback_file *p = (callback_file *) sf;
struct callback_args *a = p->sm;
int rc;
#ifdef TRACE
fprintf(stderr, "cb fileSize\n");
#endif
rc = call_callback(fill_callback_args(sf, CB_IO_TYPE_FSIZE, pSize, 0, 0));
#ifdef TRACE
fprintf(stderr, "cb fileSize %ld\n", (long) a->offset);
#endif
*pSize = a->offset;
return SQLITE_OK;
}
/*
** Lock an callback-file.
*/
static int callbackLockNot(sqlite3_file *pFile, int eLock)
{
return SQLITE_OK;
}
static int callbackLock(sqlite3_file *pFile, int eLock)
{
callback_file *p = (callback_file *) pFile;
#ifdef TRACE
fprintf(stderr, "Alock\n");
#endif
return SQLITE_OK;
}
/*
** Unlock an callback-file.
*/
static int callbackUnlockNot(sqlite3_file *pFile, int eLock)
{
return SQLITE_OK;
}
static int callbackUnlock(sqlite3_file *pFile, int eLock)
{
callback_file *p = (callback_file *) pFile;
#ifdef TRACE
fprintf(stderr, "Aunlock %s\n", p->zName);
#endif
return SQLITE_OK;
}
/*
** Check if another file-handle holds a RESERVED lock on an callback-file.
*/
static int callbackCheckReservedLockNot(sqlite3_file *pFile, int *pResOut)
{
return SQLITE_OK;
}
static int callbackCheckReservedLock(sqlite3_file *pFile, int *pResOut)
{
callback_file *p = (callback_file *) pFile;
#ifdef TRACE
fprintf(stderr, "cb check reserved lock\n");
#endif
return SQLITE_OK;
}
/*
** File control method. For custom operations on an callback-file.
*/
static int callbackFileControl(sqlite3_file *pFile, int op, void *pArg)
{
return SQLITE_OK;
}
/*
** Return the sector-size in bytes for an callback-file.
*/
static int callbackSectorSizeNot(sqlite3_file *pFile)
{
return 0;
}
static int callbackSectorSize(sqlite3_file *sf)
{
callback_file *p = (callback_file *) sf;
struct callback_args *a = p->sm;
call_callback(fill_callback_args(sf, CB_IO_TYPE_BLKSZ, NULL, 0, 0));
#ifdef TRACE
fprintf(stderr, "SectorSize %ld\n", (long) a->offset);
#endif
return a->offset;
}
/*
** Return the device characteristic flags supported by an callback-file.
*/
static int callbackDeviceCharacteristics(sqlite3_file *pFile)
{
return SQLITE_IOCAP_ATOMIC | SQLITE_IOCAP_SAFE_APPEND;
}
/*
** Delete the file located at zPath. If the dirSync argument is true,
** ensure the file-system modifications are synced to disk before
** returning.
*/
static int callbackDelete(sqlite3_vfs *aVfs, const char *zPath, int dirSync)
{
return pVfs->xDelete(pVfs, zPath, dirSync);
}
/*
** Test for access permissions. Return true if the requested permission
** is available, or false otherwise.
*/
static int callbackAccess(
sqlite3_vfs *aVfs,
const char *zPath,
int flags,
int *pResOut
)
{
int rc = pVfs->xAccess(pVfs, zPath, flags, pResOut);
#ifdef TRACE
fprintf(stderr, "A access %s want %d got %d\n", zPath, flags, *pResOut);
#endif
return rc;
}
/*
** Populate buffer zOut with the full canonical pathname corresponding
** to the pathname in zPath. zOut is guaranteed to point to a buffer
** of at least (CALLBACK_MAX_PATHNAME+1) bytes.
*/
static int callbackFullPathname(
sqlite3_vfs *aVfs,
const char *zPath,
int nOut,
char *zOut
){
#ifdef TRACE
fprintf(stderr, "cb fpn %s\n", zPath);
#endif
return pVfs->xFullPathname(pVfs, zPath, nOut, zOut);
}
/*
** Open the dynamic library located at zPath and return a handle.
*/
static void *callbackDlOpen(sqlite3_vfs *aVfs, const char *zPath){
#ifdef TRACE
fprintf(stderr, "Adlop\n");
#endif
return pVfs->xDlOpen(pVfs, zPath);
}
/*
** Populate the buffer zErrMsg (size nByte bytes) with a human readable
** utf-8 string describing the most recent error encountered associated
** with dynamic libraries.
*/
static void callbackDlError(sqlite3_vfs *aVfs, int nByte, char *zErrMsg){
#ifdef TRACE
fprintf(stderr, "Adlerr\n");
#endif
return pVfs->xDlError(pVfs, nByte, zErrMsg);
}
/*
** Return a pointer to the symbol zSymbol in the dynamic library pHandle.
*/
static void (*callbackDlSym(sqlite3_vfs *aVfs, void *p, const char *zSym))(void){
#ifdef TRACE
fprintf(stderr, "Adlsy\n");
#endif
return pVfs->xDlSym(pVfs, p, zSym);
}
/*
** Close the dynamic library handle pHandle.
*/
static void callbackDlClose(sqlite3_vfs *aVfs, void *pHandle){
#ifdef TRACE
fprintf(stderr, "Adlclo\n");
#endif
return pVfs->xDlClose(pVfs, pHandle);
}
/*
** Populate the buffer pointed to by zBufOut with nByte bytes of
** random data.
*/
static int callbackRandomness(sqlite3_vfs *aVfs, int nByte, char *zBufOut){
#ifdef TRACE
fprintf(stderr, "arand\n");
#endif
return pVfs->xRandomness(pVfs, nByte, zBufOut);
}
/*
** Sleep for nMicro microseconds. Return the number of microseconds
** actually slept.
*/
static int callbackSleep(sqlite3_vfs *aVfs, int nMicro){
#ifdef TRACE
fprintf(stderr, "Aslee\n");
#endif
return pVfs->xSleep(pVfs, nMicro);
}
/*
** Return the current time as a Julian Day number in *pTimeOut.
*/
static int callbackCurrentTime(sqlite3_vfs *pVfs, double *pTimeOut){
#ifdef TRACE
fprintf(stderr, "Act\n");
#endif
return pVfs->xCurrentTime(pVfs, pTimeOut);
}
/*
** Initialising the wrapper vfs callback_vfs
*/
#define max(a,b) ((a) > (b) ? (a) : (b))
sqlite3_vfs *callback_sqlite3_vfs_init(void *cb, void *cbr) {
#ifdef TRACE
fprintf(stderr, "regring %s \n", callback_vfs.zName);
#endif
pthread_mutex_init(&the_shared_memory_mux, NULL);
if(pVfs == NULL ) {
pVfs = sqlite3_vfs_find(NULL); // fetch default vfs
if( !pVfs ){
return NULL;
}
callback_vfs.szOsFile = callback_vfs.szOsFile - sizeof(sqlite3_file) + pVfs->szOsFile;
sqlite3_vfs_register(&callback_vfs, 0);
}
the_callback_callback = cb;
the_callback_arg_converter = cbr;
#ifdef TRACE
fprintf(stderr, "registered %s \n", callback_vfs.zName);
#endif
return &callback_vfs;
}
EOF
))
(module
sqlite3
(
sql-field sql-index
sql-with-tupels%-fold-left
sqlite3-exec
sql-close
sql-result?
sql-value
sql-connect
sql-with-tupels
sql-ref
sqlite3-statement-name
sqlite3-open-restricted-ro
sqlite3-statement-container
sqlite3-database-prep-cache
sqlite3-database-name
sqlite3-error?
sqlite3-changes
sqlite3-statement?
sqlite3-statement-raw-pointer
sqlite3-database-open-statements
sqlite3-error-cond
sqlite3-open sqlite3-close
sqlite3-open-restricted
sqlite3-error-args
make-sqlite3-statement
;;
sqlite3-bugworkaround-reset-restrictions
)
(import scheme foreign
(except chicken with-exception-handler condition?)
(except srfi-18 raise) srfi-34 srfi-35
srfi-1 srfi-69 extras util atomic pthreads)
(define new-gc-root
(foreign-lambda*
c-pointer ()
"C_GC_ROOT *r=CHICKEN_new_gc_root();"
"return(r);"))
(define make-gc-root
(foreign-lambda*
c-pointer ((scheme-object obj))
"C_GC_ROOT *r=CHICKEN_new_gc_root();"
"CHICKEN_gc_root_set(r, obj);"
"return(r);"))
(define delete-gc-root
(foreign-lambda*
void ((c-pointer r))
"CHICKEN_gc_root_set(r, C_SCHEME_FALSE);"
"CHICKEN_delete_gc_root(r);"))
(define set-gc-root!
(foreign-lambda*
void ((c-pointer r) (scheme-object obj))
"CHICKEN_gc_root_set((C_GC_ROOT *) r, obj);"))
(define (make-object-table)
(make-hash-table eq? eq?-hash))
(define *the-db-drivers* '())
(define (raise-no-db connection msg)
(raise (condition (&message (message (format "No sqlite3 connection ~a for ~a" connection msg))))))
(define (sql-connect driver db host user pass)
(let ((entry (assoc driver *the-db-drivers*)))
(if entry
((cdr entry) db host user pass)
(error (string-append "sql-connect unsupported driver '"
driver
"' requested.")))))
(define (sql-close obj)
((cond
((sqlite3-database? obj) sqlite3-close)
(else (raise-no-db obj 'sql-close)))
obj))
(define sql-result? vector?)
(define (sql-value result row field)
((cond
((sql-result? result) sqlite3-value)
(else (raise
(condition (&message (message (format "sql-value not a sqlite3 result ~a" result)))))))
result row field))
(define (sql-field result field)
((cond
((sql-result? result) sqlite3-field)
(else (raise
(condition (&message (message (format "sql-field not a sqlite3 result ~a" result)))))))
result field))
(define (sql-with-tupels connection query proc)
((cond
((sqlite3-database? connection) sqlite3-with-tupels)
(else (raise-no-db connection query)))
connection query proc))
;; WARNING: sql-index, sql-field and sql-value are experimental.
(define (sql-index self field)
(and (fx> (vector-length self) 0)
(vassoc field (vector-ref self 0))))
(define (sqlite3-field self field)
(vector-ref (vector-ref self 0) field))
(define (sqlite3-value self row field)
(vector-ref (vector-ref self (add1 row))
(or (cond
((integer? field) field)
((string? field) (sql-index self field))
((symbol? field) (sql-index self (symbol->string field)))
(else (error (format "sql-value bad index type ~s" field))))
(error (format "no field ~a in ~a" field
(vector-ref self 0))))))
(define (sql-ref self row field)
(cond
((and row field) (sql-value self row field))
(field (sql-index self field))
(else (sub1 (vector-length self)))))
(define (one-shot-sql-tupels%-fold-left db query setup-seeds fold-function seeds)
(define (range n)
(let loop ((i 0))
(if (eqv? i n) '() (cons i (loop (add1 i))))))
;; TODO fix the mxsql-driver to actually return the value!
(sql-with-tupels
db query
(lambda (result rows cols)
(if (eqv? rows 0)
(apply list->values (append! seeds (setup-seeds result rows cols)))
(let ((cols (range cols)))
(let loop ((seeds (append! seeds (setup-seeds result rows cols)))
(row 0))
(if (eqv? row rows)
(apply list->values seeds)
(receive (proceed . seeds)
(apply fold-function
(map (lambda (field)
(sql-value result row field))
cols)
seeds)
(if proceed
(loop seeds (add1 row))
(apply list->values seeds))))))))))
(define (sql-with-tupels%-fold-left db query setup-seeds fold-function seed . seeds)
(one-shot-sql-tupels%-fold-left db query setup-seeds fold-function (cons seed seeds)))
(define-record-type <sqlite3-database>
(%make-sqlite3-database raw-pointer callback-args callback open-statemets prep-cache name)
sqlite3-database?
(raw-pointer sqlite3-database-raw-pointer set-sqlite3-database-raw-pointer!)
(callback-args sqlite3-database-callback-args set-sqlite3-database-callback-args!)
(callback sqlite3-database-callback)
(open-statemets sqlite3-database-open-statements)
(prep-cache sqlite3-database-prep-cache)
(name sqlite3-database-name))
(define (make-sqlite3-database raw-pointer callback-args callback open-statemets prep-cache name)
(let ((r (%make-sqlite3-database raw-pointer callback-args callback open-statemets prep-cache name)))
(set-finalizer! r sqlite3-close)
r))
(define (sqlite3-with-tupels self query proc)
;; FIXME we need to write a per connection thread.
(let* ((result ; (sqlite3-async-exec self query)
(sqlite3-exec self query))
(rows (vector-length result)))
(if (eqv? rows 0)
(proc result 0 0)
(proc result (sub1 rows) (vector-length (vector-ref result 0))))))
(define (vassoc val vec)
(do ((i 0 (add1 i)))
((or (eqv? i (vector-length vec))
(equal? val (vector-ref vec i)))
(and (fx< i (vector-length vec)) i))))
(define-condition-type &sqlite3-error &error sqlite3-error?
(code sqlite3-error-cond)
(args sqlite3-error-args))
(define-foreign-variable SQLITE_OTHER int "SQLITE_OTHER")
(define-foreign-variable SQLITE_OK int "SQLITE_OK")
(define-foreign-variable SQLITE_ERROR int "SQLITE_ERROR")
(define-foreign-variable SQLITE_INTERNAL int "SQLITE_INTERNAL")
(define-foreign-variable SQLITE_PERM int "SQLITE_PERM")
(define-foreign-variable SQLITE_ABORT int "SQLITE_ABORT")
(define-foreign-variable SQLITE_BUSY int "SQLITE_BUSY")
(define-foreign-variable SQLITE_LOCKED int "SQLITE_LOCKED")
(define-foreign-variable SQLITE_NOMEM int "SQLITE_NOMEM")
(define-foreign-variable SQLITE_READONLY int "SQLITE_READONLY")
(define-foreign-variable SQLITE_INTERRUPT int "SQLITE_INTERRUPT")
(define-foreign-variable SQLITE_IOERR int "SQLITE_IOERR")
(define-foreign-variable SQLITE_IOERR_SHORT_READ int "SQLITE_IOERR_SHORT_READ")
(define-foreign-variable SQLITE_CORRUPT int "SQLITE_CORRUPT")
(define-foreign-variable SQLITE_NOTFOUND int "SQLITE_NOTFOUND")
(define-foreign-variable SQLITE_FULL int "SQLITE_FULL")
(define-foreign-variable SQLITE_CANTOPEN int "SQLITE_CANTOPEN")
(define-foreign-variable SQLITE_PROTOCOL int "SQLITE_PROTOCOL")
(define-foreign-variable SQLITE_EMPTY int "SQLITE_EMPTY")
(define-foreign-variable SQLITE_SCHEMA int "SQLITE_SCHEMA")
(define-foreign-variable SQLITE_TOOBIG int "SQLITE_TOOBIG")
(define-foreign-variable SQLITE_CONSTRAINT int "SQLITE_CONSTRAINT")
(define-foreign-variable SQLITE_MISMATCH int "SQLITE_MISMATCH")
(define-foreign-variable SQLITE_MISUSE int "SQLITE_MISUSE")
(define-foreign-variable SQLITE_NOLFS int "SQLITE_NOLFS")
(define-foreign-variable SQLITE_AUTH int "SQLITE_AUTH")
(define-foreign-variable SQLITE_ROW int "SQLITE_ROW")
(define-foreign-variable SQLITE_DONE int "SQLITE_DONE")
(define-foreign-type <raw-sqlite3-database> (c-pointer "struct sqlite3_db"))
(define (sqlite3-bugworkaround-reset-restrictions db)
((foreign-lambda* void ((<raw-sqlite3-database> a)) "sqlite3_set_authorizer_unrestricted(a->cnx);")
(sqlite3-database-raw-pointer db)))
(define-foreign-type <raw-sqlite3-statement> (c-pointer "sqlite3_stmt"))
(define-record-type <sqlite3-statement>
(make-sqlite3-statement raw-pointer container name)
sqlite3-statement?
(raw-pointer sqlite3-statement-raw-pointer)
(container sqlite3-statement-container)
(name sqlite3-statement-name)
)
(define-inline (sqlite3-run-fn root param fn)
((vector-ref root 1) param fn))
(define (sqlite3-error-message d)
((foreign-lambda*
c-string ((<raw-sqlite3-database> db))
"return(sqlite3_errmsg(db->cnx));")
(sqlite3-database-raw-pointer d)))
(define-foreign-type <sqlite3-callback-args> (c-pointer "struct callback_args"))
(define alloc-callback
(foreign-lambda*
<sqlite3-callback-args>
((integer size)
(scheme-object db))
#<<EOF
struct callback_args *a=malloc(sizeof(struct callback_args) + size);
pthread_mutex_init(&a->mux, NULL);
pthread_cond_init(&a->cond, NULL);
a->size = size;
a->ref = CHICKEN_new_gc_root();
CHICKEN_gc_root_set(a->ref, db);
return(a);
EOF
))
(define-foreign-type <sqlite3-open-args> (c-pointer "struct open_args"))
(define open-args
(foreign-lambda*
<sqlite3-open-args>
((scheme-object dbn)
(integer dbnlen)
(integer setup)
(scheme-object vfs)
(integer vfslen)
(<sqlite3-callback-args> sm))
#<<EOF
struct open_args *a=malloc(sizeof(struct open_args) + dbnlen + vfslen);
a->cnx = NULL;
strncpy(a->dbn, C_c_string(dbn), dbnlen);
a->dbn[dbnlen]='\0';
a->setup = setup;
if( vfs == C_SCHEME_FALSE ) {
a->vfs=NULL;
a->sm=NULL;
} else {
a->sm=sm;
a->vfs = a->dbn+dbnlen+1;
strncpy(a->vfs, C_c_string(vfs), vfslen);
a->vfs[vfslen]='\0';
}
return(a);
EOF
))
(define free-callback-args
(foreign-lambda*
void ((<sqlite3-callback-args> a))
"pthread_mutex_destroy(&a->mux);"
"pthread_cond_destroy(&a->cond);"
"CHICKEN_gc_root_set(a->ref, C_SCHEME_FALSE);"
"CHICKEN_delete_gc_root(a->ref);"
"free(a);"))
(define sqlite3-start-open
(foreign-lambda* void ((<sqlite3-open-args> s) (c-pointer callback))
"start_asynchronous_request(pthread_sqlite3_open, s, callback);"))
(define (make-callback-interface obj)
(alloc-callback 0 obj))
(define (sqlite3-open* dbn setup vfs sm)
(let ((root (let ((mux (make-mutex 'sqlite3))
(cv (make-condition-variable 'sqlite3))
(result #f)
(root (new-gc-root)))
(let ((cb (lambda (x)
(mutex-lock! mux)
(set! result x)
(mutex-unlock! mux)
(condition-variable-signal! cv)))
(req (lambda (param fn)
(set! result mux)
(mutex-lock! mux)
(semaphore-wait pthread-pool-load)
(fn param root)
(semaphore-signal pthread-pool-load)
(let loop ()
(mutex-unlock! mux cv)
(mutex-lock! mux)
(if (eq? result mux) (loop)
(begin
(mutex-unlock! mux)
(values result param)))))))
(set-gc-root! root cb)
(vector root req)))))
(call-with-values
(lambda ()
(sqlite3-run-fn root
(open-args dbn (string-length dbn)
setup
vfs (if vfs (string-length vfs) 0)
sm)
sqlite3-start-open))
(lambda (result param)
(if (eqv? result SQLITE_OK)
(values
root
((foreign-lambda*
<raw-sqlite3-database>
((<sqlite3-open-args> a)
(integer additional))
;;"sqlite3 *cnx = a->cnx; free(a); return(cnx);"
"struct sqlite3_db *db = malloc(sizeof(struct sqlite3_db));"
"db->cnx = a->cnx;"
"db->bufsize = sizeof(struct open_args) + additional;"
"db->buf = a;"
"return(db);"
)
param
(fx+ (string-length dbn) (if vfs (string-length vfs) 0))))
(begin
((foreign-lambda* void ((<sqlite3-open-args> a)) "free(a);") param)
(delete-gc-root (vector-ref root 0))
(if sm (free-callback-args sm))
(error (format "sqlite3-open* returned ~a" result))))))))
(define (sqlite3-open dbn)
(receive
(cb raw-db) (sqlite3-open* dbn 1 #f #f)
(make-sqlite3-database raw-db #f cb (make-object-table) (make-string-table) dbn)))
(define (sqlite3-open-restricted dbn . vfs)
(if (pair? vfs)
(let ((sm (make-callback-interface (cadr vfs))))
(receive (cb raw-db) (sqlite3-open* dbn 2 (car vfs) sm)
(make-sqlite3-database
raw-db sm cb (make-object-table) (make-string-table) dbn)))
(receive (cb raw-db) (sqlite3-open* dbn 2 #f #f)
(make-sqlite3-database
raw-db #f cb (make-object-table) (make-string-table) dbn))))
(define (sqlite3-open-restricted-ro dbn . vfs)
(if (pair? vfs)
(let ((sm (make-callback-interface (cadr vfs))))
(receive (cb raw-db) (sqlite3-open* dbn 3 (car vfs) sm)
(make-sqlite3-database
raw-db sm cb (make-object-table) (make-string-table) dbn)))
(receive (cb raw-db) (sqlite3-open* dbn 3 #f #f)
(make-sqlite3-database
raw-db #f cb (make-object-table) (make-string-table) dbn))))
(define sqlite3-start-close
(foreign-lambda* void ((<raw-sqlite3-database> s) (c-pointer callback))
"start_asynchronous_request((askemos_request_function_t) pthread_sqlite3_close, s, callback);"))
(define (sqlite3-close db)
(if (sqlite3-database-raw-pointer db)
(let ((raw (sqlite3-database-raw-pointer db)))
(set-sqlite3-database-raw-pointer! db #f)
(receive
(rc dbo)
(sqlite3-run-fn (sqlite3-database-callback db) raw sqlite3-start-close)
(delete-gc-root (vector-ref (sqlite3-database-callback db) 0))
(and-let* ((sm (sqlite3-database-callback-args db)))
(set-sqlite3-database-callback-args! db #f)
(free-callback-args sm))
(if (not (eqv? rc SQLITE_OK))
(condition (&message (message (sqlite3-error-message db)))))))))
(define sqlite3-changes
(foreign-lambda*
integer ((<raw-sqlite3-database> db))
"return(sqlite3_changes(db->cnx));" ))
(define sqlite3-finalize*
(foreign-lambda integer "sqlite3_finalize" <raw-sqlite3-statement>))
(define (sqlite3-finalize db stmt)
(let ((v (sqlite3-finalize* stmt)))
(or (eqv? v SQLITE_OK)
(raise (condition (&message (message (sqlite3-error-message db))))))))
(define-foreign-type <sqlite3-prepare-args> (c-pointer "struct prepare_args"))
(define prepare-args
(foreign-lambda*
<sqlite3-prepare-args>
((<raw-sqlite3-database> db)
(scheme-object sql)
(integer sqllen)
(integer offset))
"size_t bufsize = sizeof(struct prepare_args) + sqllen;"
"if(db->bufsize < bufsize) { free(db->buf); db->buf = malloc(bufsize); db->bufsize = bufsize; }"
"struct prepare_args *a=db->buf;"
"a->stmt = NULL;"
"a->db = db->cnx;"
"strncpy(a->sql,C_c_string(sql), sqllen); a->sql[sqllen]='\\0';"
"a->sql_len = sqllen;"
"a->offset = offset;"
"return(a);"
))
(define sqlite3-start-prepare
(foreign-lambda* void ((<sqlite3-prepare-args> s) (c-pointer callback))
"start_asynchronous_request(pthread_sqlite3_prepare, s, callback);"))
(define (sqlite3-prepare db sql offset)
(call-with-values
(lambda ()
(sqlite3-run-fn
(sqlite3-database-callback db)
(prepare-args (sqlite3-database-raw-pointer db) sql (string-length sql) offset)
sqlite3-start-prepare))
(lambda (rc param)
(if (eqv? rc SQLITE_OK)
(let ((stmt ((foreign-lambda*
<raw-sqlite3-statement>
((<sqlite3-prepare-args> a))
"return(a->stmt);") param))
(n ((foreign-lambda*
integer
((<sqlite3-prepare-args> a))
"return(a->tail);") param)))
;; ((foreign-lambda* void ((<sqlite3-prepare-args> a)) "free(a);") param)
(values stmt n))
(begin
;; ((foreign-lambda* void ((<sqlite3-prepare-args> a)) "free(a);") param)
(raise (condition (&message (message (sqlite3-error-message db))))))))))
(define sqlite3-db-handle
(foreign-lambda void "sqlite3_db_handle" <raw-sqlite3-statement>))
(define sqlite3-start-step
(foreign-lambda* void ((<raw-sqlite3-statement> s) (c-pointer callback))
"start_asynchronous_request((askemos_request_function_t)sqlite3_step, s, callback);"))
(define sqlite3-column-count
(foreign-lambda integer "sqlite3_column_count" <raw-sqlite3-statement>))
(define sqlite3-column-name
(foreign-lambda c-string "sqlite3_column_name" <raw-sqlite3-statement> integer))
(define (sqlite3-columns st)
(let ((n (sqlite3-column-count st)))
(let loop ((i 0))
(if (eqv? i n) '()
(cons (sqlite3-column-name st i) (loop (add1 i)))))))
;;;
;;; Return a list of lists
;;;
(define sqlite3-empty-result '#(#()))
(define (sqlite3-bind . args)
(error "NYI: arguments to sqlite3-exec"))
(define-foreign-variable SQLITE_INTEGER int "SQLITE_INTEGER")
(define-foreign-variable SQLITE_FLOAT int "SQLITE_FLOAT")
(define-foreign-variable SQLITE_NULL int "SQLITE_NULL")
(define-foreign-variable SQLITE_TEXT int "SQLITE_TEXT")
(define-foreign-variable SQLITE_BLOB int "SQLITE_BLOB")
(define sqlite3-column-type
(foreign-lambda int "sqlite3_column_type" <raw-sqlite3-statement> integer))
(define sqlite3-column-int64
(foreign-lambda integer64 "sqlite3_column_int64" <raw-sqlite3-statement> integer))
(define sqlite3-column-float
(foreign-lambda double "sqlite3_column_double" <raw-sqlite3-statement> integer))
(define sqlite3-column-text
(foreign-lambda c-string "sqlite3_column_text" <raw-sqlite3-statement> integer))
(define (sqlite3-column-null x i) #f)
(define sqlite3-values
(let ((l `((,SQLITE_INTEGER . ,sqlite3-column-int64)
(,SQLITE_FLOAT . ,sqlite3-column-float)
(,SQLITE_NULL . ,sqlite3-column-null)
(,SQLITE_TEXT . ,sqlite3-column-text)
(,SQLITE_BLOB . ,sqlite3-column-text))))
(lambda (st)
(let ((n (sqlite3-column-count st)))
(let loop ((i 0))
(if (eqv? i n) '()
(cons ((cdr (assq (sqlite3-column-type st i) l)) st i)
(loop (add1 i)))))))))
(define (sqlite3-for-each db s fn)
(do ((exit #f))
(exit #t)
(receive
(rc s) (sqlite3-run-fn (sqlite3-database-callback db) s sqlite3-start-step)
(cond
((eqv? rc SQLITE_ROW) (apply fn (sqlite3-values s)))
((eqv? rc SQLITE_DONE) (set! exit #t) #f)
(else (raise (condition (&message (message (sqlite3-error-message db))))))))))
(define (sqlite3-exec db stmt . args)
(let loop ((n 0)
(r0 sqlite3-empty-result))
(if (fx< n (string-length stmt))
(call-with-values (lambda () (sqlite3-prepare db stmt n))
(lambda (p n)
(if p
(let ((r '()))
;;
(begin
(if (pair? args) (apply sqlite3-bind p args))
;;
(guard
(ex (else (sqlite3-finalize db p)
(raise ex)))
(sqlite3-for-each
db p
(lambda args
(set! r (cons (list->vector args) r)))))
(let ((r0 (list->vector
(cons
(list->vector (sqlite3-columns p))
(reverse! r)))))
(sqlite3-finalize db p)
(loop n r0))))
r0)))
r0)))
(define sqlite3-vfs-handler #f)
(define-foreign-variable CB_IO_TYPE_BLKSZ int "CB_IO_TYPE_BLKSZ")
(define-foreign-variable CB_IO_TYPE_FSIZE int "CB_IO_TYPE_FSIZE")
(define-foreign-variable CB_IO_TYPE_READ int "CB_IO_TYPE_READ")
(define-foreign-variable CB_IO_TYPE_WRITE int "CB_IO_TYPE_WRITE")
(define-foreign-variable CB_IO_TYPE_TRUNC int "CB_IO_TYPE_TRUNC")
(define-foreign-variable CB_IO_TYPE_CLOSE int "CB_IO_TYPE_CLOSE")
(define cba-buf
(foreign-lambda* c-pointer ((<sqlite3-callback-args> arg)) "return(arg->buf);"))
(define cba-op
(foreign-lambda* integer ((<sqlite3-callback-args> arg)) "return(arg->op);"))
(define set-cba-op!
(foreign-lambda* void ((<sqlite3-callback-args> arg) (integer op)) "arg->op = op;"))
(define cba-size
(foreign-lambda* integer ((<sqlite3-callback-args> arg)) "return(arg->size);"))
(define cba-amount
(foreign-lambda* integer ((<sqlite3-callback-args> arg)) "return(arg->amount);"))
(define cba-offset
(foreign-lambda* integer ((<sqlite3-callback-args> arg)) "return(arg->offset);"))
(define set-cba-offset!
(foreign-lambda* void ((<sqlite3-callback-args> arg) (integer off)) "arg->offset = off;"))
(define (callback-wrapper arg)
(guard
(ex (else (logerr "callback-wrapper failed ~a\n" (condition->string ex))))
((foreign-lambda* void ((<sqlite3-callback-args> arg)) "pthread_mutex_lock(&arg->mux);") arg)
(let ((op (cba-op arg))
(ref ((foreign-lambda* scheme-object ((<sqlite3-callback-args> arg)) "return(CHICKEN_gc_root_ref(arg->ref));") arg)))
(define opaque (vector-ref ref 0))
(cond
((eq? op CB_IO_TYPE_READ)
(let ((r ((vector-ref ref 3)
opaque (cba-buf arg) (cba-amount arg) (cba-offset arg))))
(set-cba-op! arg
(case r
((SQLITE_OK) SQLITE_OK)
((SQLITE_IOERR_SHORT_READ) SQLITE_IOERR_SHORT_READ)
(else SQLITE_ERROR)))))
((eq? op CB_IO_TYPE_WRITE)
(let ((r ((vector-ref ref 4)
opaque (cba-buf arg) (cba-amount arg) (cba-offset arg))))
(set-cba-op! arg SQLITE_OK)))
((eq? op CB_IO_TYPE_TRUNC)
(let ((r ((vector-ref ref 5) opaque (cba-offset arg))))
(set-cba-op! arg SQLITE_OK)))
((eq? op CB_IO_TYPE_CLOSE)
(set-cba-op! arg (if ((vector-ref ref 6) opaque) SQLITE_OK SQLITE_ERROR)))
((eq? op CB_IO_TYPE_FSIZE)
(let ((s ((vector-ref ref 2) opaque)))
(set-cba-op! arg SQLITE_OK)
(set-cba-offset! arg s)))
((eq? op CB_IO_TYPE_BLKSZ)
(let ((s ((vector-ref ref 1) opaque)))
(set-cba-op! arg SQLITE_OK)
(set-cba-offset! arg s)))
(else (logerr "callback-wrapper unknown request ~a\n" op)))))
((foreign-lambda*
integer ((<sqlite3-callback-args> arg))
"pthread_mutex_unlock(&arg->mux);"
"return(pthread_cond_signal(&arg->cond));")
arg))
((foreign-lambda void "callback_sqlite3_vfs_init" c-pointer c-pointer)
(make-gc-root callback-wrapper)
(make-gc-root
(foreign-lambda* <sqlite3-callback-args> (((c-pointer <sqlite3-callback-args>) arg)) "return(*arg);")))
)
(import (prefix sqlite3 m:))
(define sql-result? m:sql-result?)
(define sql-field m:sql-field)
(define sql-index m:sql-index)
(define sql-with-tupels%-fold-left m:sql-with-tupels%-fold-left)
(define sqlite3-exec m:sqlite3-exec)
(define sql-close m:sql-close)
(define sql-value m:sql-value)
(define sql-connect m:sql-connect)
(define sql-with-tupels m:sql-with-tupels)
(define sql-ref m:sql-ref)
(define sqlite3-statement-name m:sqlite3-statement-name)
(define sqlite3-open-restricted-ro m:sqlite3-open-restricted-ro)
(define sqlite3-statement-container m:sqlite3-statement-container)
(define sqlite3-database-prep-cache m:sqlite3-database-prep-cache)
(define sqlite3-database-name m:sqlite3-database-name)
(define sqlite3-error? m:sqlite3-error?)
(define sqlite3-changes m:sqlite3-changes)
(define sqlite3-statement? m:sqlite3-statement?)
(define sqlite3-statement-raw-pointer m:sqlite3-statement-raw-pointer)
(define sqlite3-database-open-statements m:sqlite3-database-open-statements)
(define sqlite3-error-cond m:sqlite3-error-cond)
(define sqlite3-open m:sqlite3-open)
(define sqlite3-close m:sqlite3-close)
(define sqlite3-open-restricted m:sqlite3-open-restricted)
(define sqlite3-error-args m:sqlite3-error-args)
(define make-sqlite3-statement m:make-sqlite3-statement)
_______________________________________________
Chicken-users mailing list
[email protected]
http://lists.nongnu.org/mailman/listinfo/chicken-users