Changeset: ed5514bb7e17 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ed5514bb7e17 Added Files: clients/R/MonetDB.R/src/Makevars.win clients/R/MonetDB.R/src/profiler.c Modified Files: clients/R/MonetDB.R/R/dbi.R clients/R/MonetDB.R/src/mapisplit.c monetdb5/optimizer/opt_querylog.c Branch: default Log Message:
R Connector: Progress bar (see http://homepages.cwi.nl/~hannes/o2.gif) diffs (truncated from 427 to 300 lines): diff --git a/clients/R/MonetDB.R/R/dbi.R b/clients/R/MonetDB.R/R/dbi.R --- a/clients/R/MonetDB.R/R/dbi.R +++ b/clients/R/MonetDB.R/R/dbi.R @@ -120,19 +120,18 @@ setMethod("dbConnect", "MonetDBDriver", connenv$deferred <- list() connenv$exception <- list() - conn <- new("MonetDBConnection", socket=socket, connenv=connenv, Id=-1L) + conn <- new("MonetDBConnection", socket=socket, connenv=connenv) if (getOption("monetdb.sequential", F)) { message("MonetDB: Switching to single-threaded query execution.") dbSendQuery(conn, "set optimizer='sequential_pipe'") } # enable profiler, we use a MAL connection for this - if (getOption("monetdb.profile", F)) { + if (getOption("monetdb.profile", T)) { msocket <- .mapiConnect(host, port, timeout) .mapiAuthenticate(msocket, dbname, user, password, language="mal") .profiler_enable(msocket) .mapiDisconnect(msocket); - message("Enabled profiler") } return(conn) @@ -143,7 +142,7 @@ valueClass="MonetDBConnection") ### MonetDBConnection setClass("MonetDBConnection", representation("DBIConnection", socket="ANY", - connenv="environment", fetchSize="integer", Id="integer")) + connenv="environment")) setMethod("dbGetInfo", "MonetDBConnection", def=function(dbObj, ...) { envdata <- dbGetQuery(dbObj, "SELECT name, value from sys.env()") @@ -238,6 +237,9 @@ setMethod("dbSendQuery", signature(conn= conn@connenv$exception <- list() env <- NULL if (getOption("monetdb.debug.query", F)) message("QQ: '", statement, "'") + # make the progress bar wait for querylog.define + if (getOption("monetdb.profile", T)) .profiler_arm() + # the actual request mresp <- .mapiRequest(conn, paste0("s", statement, "\n;"), async=async) resp <- .mapiParseResponse(mresp) @@ -526,13 +528,15 @@ setMethod("dbFetch", signature(res="Mone } # if our tuple cache in res@env$data does not contain n rows, we fetch from server until it does - if (length(res@env$data) < n) { + while (length(res@env$data) < n) { cresp <- .mapiParseResponse(.mapiRequest(res@env$conn, paste0("Xexport ", .mapiLongInt(info$id), - " ", .mapiLongInt(info$index), " ", .mapiLongInt(n-length(res@env$data))))) + " ", .mapiLongInt(info$index), " ", .mapiLongInt(min(10000,n-length(res@env$data)))))) stopifnot(cresp$type == Q_BLOCK && cresp$rows > 0) res@env$data <- c(res@env$data, cresp$tuples) info$index <- info$index + cresp$rows + #print(paste0(length(res@env$data), " of ", info$rows)); + if (getOption("monetdb.profile", T)) .profiler_progress(length(res@env$data), n) } # convert tuple string vector into matrix so we can access a single column efficiently @@ -571,6 +575,8 @@ setMethod("dbFetch", signature(res="Mone attr(df, "row.names") <- c(NA_integer_, length(df[[1]])) class(df) <- "data.frame" + if (getOption("monetdb.profile", T)) .profiler_clear() + return(df) }) diff --git a/clients/R/MonetDB.R/src/Makevars.win b/clients/R/MonetDB.R/src/Makevars.win new file mode 100644 --- /dev/null +++ b/clients/R/MonetDB.R/src/Makevars.win @@ -0,0 +1,1 @@ +PKG_LIBS= -lws2_32 -lpthread \ No newline at end of file diff --git a/clients/R/MonetDB.R/src/mapisplit.c b/clients/R/MonetDB.R/src/mapisplit.c --- a/clients/R/MonetDB.R/src/mapisplit.c +++ b/clients/R/MonetDB.R/src/mapisplit.c @@ -9,6 +9,21 @@ typedef enum { INQUOTES, ESCAPED, INTOKEN, INCRAP } mapi_line_chrstate; +void mapi_unescape(char* in, char* out) { + char escaped = 0; + size_t i, o = 0; + + for (i=0; i < strlen(in); i++) { + if (!escaped && in[i] == '\\') { + escaped = 1; + continue; + } + out[o++] = in[i]; + escaped = 0; + } + out[o] = '\0'; +} + void mapi_line_split(char* line, char** out, size_t ncols) { int cCol = 0; int tokenStart = 2; @@ -89,21 +104,12 @@ SEXP mapi_split(SEXP mapiLinesVector, SE int cRow; int cCol; - int tokenStart; - int curPos; - int endQuote; char* elems[cols]; for (cRow = 0; cRow < rows; cRow++) { const char *rval = CHAR(STRING_ELT(mapiLinesVector, cRow)); char *val = strdup(rval); - int linelen = strlen(val); - cCol = 0; - tokenStart = 2; - curPos = 0; - endQuote = 0; - mapi_line_split(val, elems, cols); for (cCol = 0; cCol < cols; cCol++) { diff --git a/clients/R/MonetDB.R/src/profiler.c b/clients/R/MonetDB.R/src/profiler.c new file mode 100644 --- /dev/null +++ b/clients/R/MonetDB.R/src/profiler.c @@ -0,0 +1,282 @@ +#include <assert.h> +#include <string.h> +#include <ctype.h> +#include <errno.h> + +#include <stdio.h> +#include <pthread.h> +#include <signal.h> +#include <unistd.h> + +#include <sys/types.h> +#include <sys/fcntl.h> +#include <sys/time.h> + +#include <R.h> +#include <Rdefines.h> + +#ifdef __WIN32__ +#include <winsock2.h> +#include <ws2tcpip.h> +#else +#include <sys/socket.h> +#include <netinet/in.h> +#endif + +// trace output format and columns +#define TRACE_NCOLS 14 +#define TRACE_COL_QUERYID 2 +#define TRACE_COL_STATEFL 4 +#define TRACE_COL_MALSTMT 13 +#define TRACE_MAL_MAXPARAMS 3 // we don't need more than 3 params here + +// size of the progress bar in characters +#define PROFILER_BARSYMB 60 + +static int profiler_socket; +static pthread_t profiler_pthread; +static int profiler_needcleanup = 0; +static int profiler_armed = 0; + +static char* profiler_symb_query = "X"; +static char* profiler_symb_trans = "V"; +static char* profiler_symb_bfree = "_"; +static char* profiler_symb_bfull = "#"; + +int strupp(char *s) { + int i; + for (i = 0; i < strlen(s); i++) + s[i] = toupper(s[i]); + return i; +} + +/* standalone MAL function call parser */ +typedef enum { + ASSIGNMENT, FUNCTION, PARAM, QUOTED, ESCAPED +} mal_statement_state; + +typedef struct { + char* assignment; + char* function; + unsigned short nparams; + char** params; +} mal_statement; + +void mal_statement_split(char* stmt, mal_statement *out, size_t maxparams) { + #define TRIM(str) \ + while (str[0] == ' ' || str[0] == '"') str++; endPos = curPos - 1; \ + while (stmt[endPos] == ' ' || stmt[endPos] == '"') { stmt[endPos] = '\0'; endPos--; } + + unsigned int curPos, endPos, paramStart = 0, stmtLen; + mal_statement_state state = ASSIGNMENT; + + out->assignment = stmt; + out->function = stmt; + out->nparams = 0; + + stmtLen = strlen(stmt); + for (curPos = 0; curPos < stmtLen; curPos++) { + char chr = stmt[curPos]; + switch (state) { + case ASSIGNMENT: + if (chr == ':' && stmt[curPos+1] == '=') { + stmt[curPos] = '\0'; + TRIM(out->assignment) + state = FUNCTION; + out->function = &stmt[curPos + 2]; + } + break; + + case FUNCTION: + if (chr == '(' || chr == ';') { + stmt[curPos] = '\0'; + TRIM(out->function) + state = PARAM; + paramStart = curPos+1; + } + break; + + case PARAM: + if (chr == '"') { + state = QUOTED; + } + if (chr == ',' || chr == ')') { + stmt[curPos] = '\0'; + out->params[out->nparams] = &stmt[paramStart]; + TRIM(out->params[out->nparams]) + out->nparams++; + if (out->nparams >= maxparams) { + return; + } + paramStart = curPos+1; + } + break; + + case QUOTED: + if (chr == '"') { + state = PARAM; + break; + } + if (chr == '\\') { + state = ESCAPED; + break; + } + break; + + case ESCAPED: + state = QUOTED; + break; + } + } +} + +// from mapisplit.c, the trace tuple format is similar(*) to the mapi tuple format +void mapi_line_split(char* line, char** out, size_t ncols); +void mapi_unescape(char* in, char* out); + +unsigned long profiler_tsms() { + unsigned long ret = 0; + struct timeval tv; + gettimeofday(&tv, NULL); + ret += tv.tv_sec * 1000; + ret += tv.tv_usec / 1000; + return ret; +} + +// clear line and overwrite with spaces +void profiler_clearbar() { + if (!profiler_needcleanup) return; + for (int bs=0; bs < PROFILER_BARSYMB + 3 + 6; bs++) printf("\b \b"); + profiler_needcleanup = 0; +} + +void profiler_renderbar(size_t state, size_t total, char *symbol) { + int bs; + unsigned short percentage, symbols; + percentage = (unsigned short) round((1.0 * + state / total) * 100); + symbols = PROFILER_BARSYMB*(percentage/100.0); + + profiler_clearbar(); + profiler_needcleanup = 1; + printf("%s ", symbol); + for (bs=0; bs < symbols; bs++) printf("%s", profiler_symb_bfull); + for (bs=0; bs < PROFILER_BARSYMB-symbols; bs++) printf("%s", profiler_symb_bfree); + + printf(" %3u%% ", percentage); + fflush(stdout); +} + +void *profiler_thread() { _______________________________________________ checkin-list mailing list [email protected] https://www.monetdb.org/mailman/listinfo/checkin-list
