Changeset: 9cb1d66ebc89 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9cb1d66ebc89
Modified Files:
clients/R/MonetDB.R/NAMESPACE
clients/R/MonetDB.R/R/monetdb.R
clients/R/MonetDB.R/src/mapi.c
Branch: default
Log Message:
R Connector
- Switched to external pointers for socket connection ID
- Fixed timeouts
- Fixed SO/DLL loading bug
diffs (236 lines):
diff --git a/clients/R/MonetDB.R/NAMESPACE b/clients/R/MonetDB.R/NAMESPACE
--- a/clients/R/MonetDB.R/NAMESPACE
+++ b/clients/R/MonetDB.R/NAMESPACE
@@ -1,4 +1,4 @@
-import(DBI,utils,stats)
+import(DBI,utils,stats,digest,methods)
# export only driver constructor, everything else is DBI stuff..
export(MonetDB,MonetR,MonetDBR,MonetDB.R)
diff --git a/clients/R/MonetDB.R/R/monetdb.R b/clients/R/MonetDB.R/R/monetdb.R
--- a/clients/R/MonetDB.R/R/monetdb.R
+++ b/clients/R/MonetDB.R/R/monetdb.R
@@ -1,6 +1,11 @@
require(DBI)
require(digest)
+.onLoad <- function(lib, pkg) {
+ library.dynam( "MonetDB.R", pkg, lib )
+ .Call("mapiInit",PACKAGE="MonetDB.R")
+}
+
# TODO: make these values configurable in the call to dbConnect
DEBUG_IO <- FALSE
DEBUG_QUERY <- FALSE
@@ -59,17 +64,17 @@ setMethod("dbConnect", "MonetDBDriver",
#
blocking = TRUE, open="r+b",timeout = 5 )
# this goes to
src/mapi.c
- socket <-
socket <<- .Call("mapiConnect",host,port,5)
+ socket <-
socket <<- .Call("mapiConnect",host,port,5,PACKAGE="MonetDB.R")
# authenticate
.monetAuthenticate(socket,dbname,user,password)
# test the
connection to make sure it works before
.mapiWrite(socket,"sSELECT 42;"); .mapiRead(socket)
#close(socket)
-
.Call("mapiDisconnect",socket)
+
.Call("mapiDisconnect",socket,PACKAGE="MonetDB.R")
break
}, error = function(e) {
if
("connection" %in% class(socket)) {
-
.Call("mapiDisconnect",socket)
+
.Call("mapiDisconnect",socket,PACKAGE="MonetDB.R")
}
cat(paste0("Server not ready(",e$message,"), retrying (ESC or CTRL+C to
abort)\n"))
Sys.sleep(1)
@@ -81,7 +86,7 @@ setMethod("dbConnect", "MonetDBDriver",
# make new socket with user-specified timeout
#socket <- socket <<- socketConnection(host = host,
port = port,
# blocking = TRUE, open="r+b",timeout = timeout)
- socket <- socket <<- .Call("mapiConnect",host,port,5)
+ socket <- socket <<-
.Call("mapiConnect",host,port,timeout,PACKAGE="MonetDB.R")
.monetAuthenticate(socket,dbname,user,password)
connenv <- new.env(parent=emptyenv())
connenv$lock <- 0
@@ -94,10 +99,10 @@ setMethod("dbConnect", "MonetDBDriver",
### MonetDBConnection, #monetdb_mapi_conn
-setClass("MonetDBConnection",
representation("DBIConnection",socket="monetdb_mapi_conn",connenv="environment",fetchSize="integer"))
+setClass("MonetDBConnection",
representation("DBIConnection",socket="externalptr",connenv="environment",fetchSize="integer"))
setMethod("dbDisconnect", "MonetDBConnection", def=function(conn, ...) {
- .Call("mapiDisconnect",conn@socket)
+ .Call("mapiDisconnect",conn@socket,PACKAGE="MonetDB.R")
TRUE
})
@@ -557,9 +562,9 @@ REPLY_SIZE <- 100 # Apparently, -1 me
}
.mapiRead <- function(con) {
- if (!identical(class(con)[[1]],"monetdb_mapi_conn"))
- stop("I can only be called with a monetdb_mapi_conn object as
parameter.")
- respstr <- .Call("mapiRead",con)
+ if (!identical(class(con)[[1]],"externalptr"))
+ stop("I can only be called with a MonetDB connection object as
parameter.")
+ respstr <- .Call("mapiRead",con,PACKAGE="MonetDB.R")
if (DEBUG_IO) {
dstr <- respstr
if (nchar(dstr) > 300) {
@@ -571,11 +576,11 @@ REPLY_SIZE <- 100 # Apparently, -1 me
}
.mapiWrite <- function(con,msg) {
- if (!identical(class(con)[[1]],"monetdb_mapi_conn"))
- stop("I can only be called with a monetdb_mapi_conn object as
parameter.")
+ if (!identical(class(con)[[1]],"externalptr"))
+ stop("I can only be called with a MonetDB connection object as
parameter.")
if (DEBUG_IO) cat(paste("TX: '",msg,"'\n",sep=""))
- .Call("mapiWrite",con,msg)
+ .Call("mapiWrite",con,msg,PACKAGE="MonetDB.R")
return (NULL)
}
diff --git a/clients/R/MonetDB.R/src/mapi.c b/clients/R/MonetDB.R/src/mapi.c
--- a/clients/R/MonetDB.R/src/mapi.c
+++ b/clients/R/MonetDB.R/src/mapi.c
@@ -21,13 +21,34 @@
#define BLOCKSIZE 8190
#define BUFSIZE BLOCKSIZE+1
#define SOCKET int
-#define SOCK_ATTR "mapi_conn_do_not_touch"
-#define CONN_CLASS "monetdb_mapi_conn"
#define TRUE 1
#define FALSE 0
#define ALLOCSIZE 1048576 // 1 MB
#define DEBUG FALSE
+// reference tricks taken from
http://homepage.stat.uiowa.edu/~luke/R/simpleref.html#NWarqU3-KrSQa-1
+static SEXP MAPI_type_tag;
+
+#define CHECK_MAPI_SOCK(s) do { \
+ if (TYPEOF(s) != EXTPTRSXP || \
+ R_ExternalPtrTag(s) != MAPI_type_tag) \
+ error("bad socket"); \
+} while (0)
+
+SEXP mapiInit(void) {
+ MAPI_type_tag = install("MAPI_TYPE_TAG");
+ return R_NilValue;
+}
+
+SEXP mapiDisconnect(SEXP conn) {
+ CHECK_MAPI_SOCK(conn);
+ SOCKET *sock = R_ExternalPtrAddr(conn);
+ shutdown(*sock, 2);
+ R_ClearExternalPtr(conn);
+ free(sock);
+ return R_NilValue;
+}
+
SEXP mapiConnect(SEXP host, SEXP port, SEXP timeout) {
// be a bit paranoid about the parameters
assert(IS_CHARACTER(host));
@@ -45,7 +66,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S
assert(portval > 0 && portval < 65535);
assert(timeoutval > 0);
- SEXP connobj, class, attr;
+ SEXP connobj;
SOCKET sock;
struct addrinfo hints;
@@ -69,9 +90,9 @@ SEXP mapiConnect(SEXP host, SEXP port, S
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
+ // resolve dns name
char portvalstr[15];
sprintf(portvalstr, "%d", portval);
-
int s = getaddrinfo(hostval, portvalstr, &hints, &result);
if (s != 0) {
error("ERROR, failed to resolve host %s", hostval);
@@ -101,7 +122,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S
if (DEBUG) {
printf("II: Connected to %s:%s\n", hostval,
portvalstr);
}
- break; /* Success */
+ break; // Profit
}
close(sock);
}
@@ -111,35 +132,24 @@ SEXP mapiConnect(SEXP host, SEXP port, S
}
freeaddrinfo(result);
- if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &sto,
sizeof(sto))
- < 0)
- error("setsockopt failed");
- if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &sto,
sizeof(sto))
- < 0)
- error("setsockopt failed");
+ // get the socket number off the stack so that we can use R external
pointers for passing it around
+ // external pointers are very useful as they can have finalizers, in
our case, mapiDisconnect()
+ void * sockaddr = malloc(sizeof(int));
+ if (sockaddr == NULL) {
+ error("Error in malloc() for a single integer, srsly?");
+ }
+ memcpy(sockaddr, &sock, sizeof(int));
- // construct a r object of class monetdb_mapi_conn with an attribute
holding the connection id
- PROTECT(connobj = ScalarInteger(1));
- PROTECT(attr = ScalarInteger(1));
- PROTECT(class = allocVector(STRSXP, 1));
- SET_STRING_ELT(class, 0, mkChar(CONN_CLASS));
- classgets(connobj, class);
- INTEGER_POINTER(attr)[0] = sock;
- setAttrib(connobj, install(SOCK_ATTR), attr);
- UNPROTECT(3);
+ PROTECT(connobj = R_MakeExternalPtr(sockaddr, MAPI_type_tag,
R_NilValue));
+ R_RegisterCFinalizerEx(connobj, (R_CFinalizer_t) mapiDisconnect, 0);
+ CHECK_MAPI_SOCK(connobj);
+ UNPROTECT(1);
return connobj;
}
-SEXP mapiDisconnect(SEXP conn) {
- SOCKET sock = INTEGER_POINTER(
- AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
- shutdown(sock, 2);
- return R_NilValue;
-}
-
SEXP mapiRead(SEXP conn) {
- SOCKET sock = INTEGER_POINTER(
- AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
+ CHECK_MAPI_SOCK(conn);
+ SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn));
SEXP lines;
char read_buf[BUFSIZE];
@@ -157,7 +167,6 @@ SEXP mapiRead(SEXP conn) {
while (!block_final) {
// read block header and extract block length and final bit
from header
// this assumes little-endianness (so sue me)
-
n = recv(sock, (void *) &header, 2, MSG_WAITALL);
if (n != 2) {
error("ERROR reading MAPI block header (%d)", n);
@@ -207,11 +216,12 @@ SEXP mapiRead(SEXP conn) {
}
SEXP mapiWrite(SEXP conn, SEXP message) {
+ CHECK_MAPI_SOCK(conn);
+ SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn));
+
assert(IS_CHARACTER(message));
assert(GET_LENGTH(message) == 1);
- SOCKET sock = INTEGER_POINTER(
- AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
const char *messageval = CHAR(STRING_ELT(message, 0));
assert(strlen(messageval) > 0);
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list