Changeset: ed5514bb7e17 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ed5514bb7e17
Added Files:
        clients/R/MonetDB.R/src/Makevars.win
        clients/R/MonetDB.R/src/profiler.c
Modified Files:
        clients/R/MonetDB.R/R/dbi.R
        clients/R/MonetDB.R/src/mapisplit.c
        monetdb5/optimizer/opt_querylog.c
Branch: default
Log Message:

R Connector: Progress bar (see http://homepages.cwi.nl/~hannes/o2.gif)


diffs (truncated from 427 to 300 lines):

diff --git a/clients/R/MonetDB.R/R/dbi.R b/clients/R/MonetDB.R/R/dbi.R
--- a/clients/R/MonetDB.R/R/dbi.R
+++ b/clients/R/MonetDB.R/R/dbi.R
@@ -120,19 +120,18 @@ setMethod("dbConnect", "MonetDBDriver", 
   connenv$deferred <- list()
   connenv$exception <- list()
   
-  conn <- new("MonetDBConnection", socket=socket, connenv=connenv, Id=-1L)
+  conn <- new("MonetDBConnection", socket=socket, connenv=connenv)
   if (getOption("monetdb.sequential", F)) {
     message("MonetDB: Switching to single-threaded query execution.")
     dbSendQuery(conn, "set optimizer='sequential_pipe'")
   }
 
   # enable profiler, we use a MAL connection for this
-  if (getOption("monetdb.profile", F)) {
+  if (getOption("monetdb.profile", T)) {
     msocket <- .mapiConnect(host, port, timeout) 
     .mapiAuthenticate(msocket, dbname, user, password, language="mal")
     .profiler_enable(msocket)
     .mapiDisconnect(msocket);
-    message("Enabled profiler")
   }
   
   return(conn)
@@ -143,7 +142,7 @@ valueClass="MonetDBConnection")
 
 ### MonetDBConnection
 setClass("MonetDBConnection", representation("DBIConnection", socket="ANY", 
-                                             connenv="environment", 
fetchSize="integer", Id="integer"))
+                                             connenv="environment"))
 
 setMethod("dbGetInfo", "MonetDBConnection", def=function(dbObj, ...) {
   envdata <- dbGetQuery(dbObj, "SELECT name, value from sys.env()")
@@ -238,6 +237,9 @@ setMethod("dbSendQuery", signature(conn=
   conn@connenv$exception <- list()
   env <- NULL
   if (getOption("monetdb.debug.query", F))  message("QQ: '", statement, "'")
+  # make the progress bar wait for querylog.define
+  if (getOption("monetdb.profile", T))  .profiler_arm()
+
   # the actual request
   mresp <- .mapiRequest(conn, paste0("s", statement, "\n;"), async=async)
   resp <- .mapiParseResponse(mresp)
@@ -526,13 +528,15 @@ setMethod("dbFetch", signature(res="Mone
   }
   
   # if our tuple cache in res@env$data does not contain n rows, we fetch from 
server until it does
-  if (length(res@env$data) < n) {
+  while (length(res@env$data) < n) {
     cresp <- .mapiParseResponse(.mapiRequest(res@env$conn, paste0("Xexport ", 
.mapiLongInt(info$id), 
-                                                                  " ", 
.mapiLongInt(info$index), " ", .mapiLongInt(n-length(res@env$data)))))
+                                                                  " ", 
.mapiLongInt(info$index), " ", 
.mapiLongInt(min(10000,n-length(res@env$data))))))
     stopifnot(cresp$type == Q_BLOCK && cresp$rows > 0)
     
     res@env$data <- c(res@env$data, cresp$tuples)
     info$index <- info$index + cresp$rows
+    #print(paste0(length(res@env$data), " of ", info$rows));
+    if (getOption("monetdb.profile", T))  
.profiler_progress(length(res@env$data), n)
   }
   
   # convert tuple string vector into matrix so we can access a single column 
efficiently
@@ -571,6 +575,8 @@ setMethod("dbFetch", signature(res="Mone
   attr(df, "row.names") <- c(NA_integer_, length(df[[1]]))
   class(df) <- "data.frame"
   
+  if (getOption("monetdb.profile", T))  .profiler_clear()
+
   return(df)
 })
 
diff --git a/clients/R/MonetDB.R/src/Makevars.win 
b/clients/R/MonetDB.R/src/Makevars.win
new file mode 100644
--- /dev/null
+++ b/clients/R/MonetDB.R/src/Makevars.win
@@ -0,0 +1,1 @@
+PKG_LIBS= -lws2_32 -lpthread
\ No newline at end of file
diff --git a/clients/R/MonetDB.R/src/mapisplit.c 
b/clients/R/MonetDB.R/src/mapisplit.c
--- a/clients/R/MonetDB.R/src/mapisplit.c
+++ b/clients/R/MonetDB.R/src/mapisplit.c
@@ -9,6 +9,21 @@ typedef enum {
        INQUOTES, ESCAPED, INTOKEN, INCRAP
 } mapi_line_chrstate;
 
+void mapi_unescape(char* in, char* out) {
+       char escaped = 0;
+       size_t i, o = 0;
+
+       for (i=0; i < strlen(in); i++) {
+               if (!escaped && in[i] == '\\') {
+                       escaped = 1;
+                       continue;
+               }
+               out[o++] = in[i];
+               escaped = 0;
+       }
+       out[o] = '\0';
+}
+
 void mapi_line_split(char* line, char** out, size_t ncols) {
        int cCol = 0;
        int tokenStart = 2;
@@ -89,21 +104,12 @@ SEXP mapi_split(SEXP mapiLinesVector, SE
 
        int cRow;
        int cCol;
-       int tokenStart;
-       int curPos;
-       int endQuote;
        char* elems[cols];
 
        for (cRow = 0; cRow < rows; cRow++) {
                const char *rval = CHAR(STRING_ELT(mapiLinesVector, cRow));
                char *val = strdup(rval);
-               int linelen = strlen(val);
-
                cCol = 0;
-               tokenStart = 2;
-               curPos = 0;
-               endQuote = 0;
-
                mapi_line_split(val, elems, cols);
 
                for (cCol = 0; cCol < cols; cCol++) {
diff --git a/clients/R/MonetDB.R/src/profiler.c 
b/clients/R/MonetDB.R/src/profiler.c
new file mode 100644
--- /dev/null
+++ b/clients/R/MonetDB.R/src/profiler.c
@@ -0,0 +1,282 @@
+#include <assert.h>
+#include <string.h>
+#include <ctype.h>
+#include <errno.h>
+
+#include <stdio.h>
+#include <pthread.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include <sys/types.h>
+#include <sys/fcntl.h>
+#include <sys/time.h>
+
+#include <R.h>
+#include <Rdefines.h>
+
+#ifdef __WIN32__
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#else
+#include <sys/socket.h>
+#include <netinet/in.h>
+#endif
+
+// trace output format and columns
+#define TRACE_NCOLS 14
+#define TRACE_COL_QUERYID 2
+#define TRACE_COL_STATEFL 4
+#define TRACE_COL_MALSTMT 13
+#define TRACE_MAL_MAXPARAMS 3 // we don't need more than 3 params here
+
+// size of the progress bar in characters
+#define PROFILER_BARSYMB 60
+
+static int profiler_socket;
+static pthread_t profiler_pthread;
+static int profiler_needcleanup = 0;
+static int profiler_armed = 0;
+
+static char* profiler_symb_query = "X";
+static char* profiler_symb_trans = "V";
+static char* profiler_symb_bfree = "_";
+static char* profiler_symb_bfull = "#";
+
+int strupp(char *s) {
+    int i;
+    for (i = 0; i < strlen(s); i++)
+        s[i] = toupper(s[i]);
+    return i;
+}
+
+/* standalone MAL function call parser */
+typedef enum {
+       ASSIGNMENT, FUNCTION, PARAM, QUOTED, ESCAPED
+} mal_statement_state;
+
+typedef struct {
+       char* assignment;
+       char* function;
+       unsigned short nparams;
+       char** params;
+} mal_statement;
+
+void mal_statement_split(char* stmt, mal_statement *out, size_t maxparams) {
+       #define TRIM(str) \
+       while (str[0] == ' ' || str[0] == '"') str++; endPos = curPos - 1; \
+       while (stmt[endPos] == ' ' || stmt[endPos] == '"') { stmt[endPos] = 
'\0'; endPos--; }
+
+       unsigned int curPos, endPos, paramStart = 0, stmtLen;
+       mal_statement_state state = ASSIGNMENT;
+
+       out->assignment = stmt;
+       out->function = stmt;
+       out->nparams = 0;
+
+       stmtLen = strlen(stmt);
+       for (curPos = 0; curPos < stmtLen; curPos++) {
+               char chr = stmt[curPos];
+               switch (state) {
+               case ASSIGNMENT:
+                       if (chr == ':' && stmt[curPos+1] == '=') {
+                               stmt[curPos] = '\0';
+                               TRIM(out->assignment)
+                               state = FUNCTION;
+                               out->function = &stmt[curPos + 2];
+                       }
+                       break;
+               
+               case FUNCTION:
+                       if (chr == '(' || chr == ';') {
+                               stmt[curPos] = '\0';
+                               TRIM(out->function)
+                               state = PARAM;
+                               paramStart = curPos+1;
+                       }
+                       break;
+
+               case PARAM:
+                       if (chr == '"') {
+                               state = QUOTED;
+                       }
+                       if (chr == ',' || chr == ')') {
+                               stmt[curPos] = '\0';
+                               out->params[out->nparams] = &stmt[paramStart];
+                               TRIM(out->params[out->nparams])
+                               out->nparams++;
+                               if (out->nparams >= maxparams) {
+                                       return;
+                               }
+                               paramStart = curPos+1;
+                       }
+                       break;
+
+               case QUOTED:
+                       if (chr == '"') {
+                               state = PARAM;
+                               break;
+                       }
+                       if (chr == '\\') {
+                               state = ESCAPED;
+                               break;
+                       }
+                       break;
+
+               case ESCAPED:
+                       state = QUOTED;
+                       break;
+               }
+       }
+}
+
+// from mapisplit.c, the trace tuple format is similar(*) to the mapi tuple 
format
+void mapi_line_split(char* line, char** out, size_t ncols);
+void mapi_unescape(char* in, char* out);
+
+unsigned long profiler_tsms() {
+       unsigned long ret = 0;
+       struct timeval tv;
+       gettimeofday(&tv, NULL);
+       ret += tv.tv_sec * 1000;
+       ret += tv.tv_usec / 1000;
+       return ret;
+}
+
+// clear line and overwrite with spaces
+void profiler_clearbar() {
+       if (!profiler_needcleanup) return;
+       for (int bs=0; bs < PROFILER_BARSYMB + 3 + 6; bs++) printf("\b \b"); 
+       profiler_needcleanup = 0;
+}
+
+void profiler_renderbar(size_t state, size_t total, char *symbol) {
+       int bs;
+       unsigned short percentage, symbols;
+       percentage = (unsigned short) round((1.0 * 
+               state / total) * 100);
+       symbols = PROFILER_BARSYMB*(percentage/100.0);
+
+       profiler_clearbar();
+       profiler_needcleanup = 1;
+       printf("%s ", symbol);
+       for (bs=0; bs < symbols; bs++) printf("%s", profiler_symb_bfull);
+       for (bs=0; bs < PROFILER_BARSYMB-symbols; bs++) printf("%s", 
profiler_symb_bfree); 
+
+       printf(" %3u%% ", percentage);
+       fflush(stdout);
+}
+
+void *profiler_thread() {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to