Changeset: e2aa9541880b for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e2aa9541880b
Modified Files:
        ctest/tools/monetdbe/example_proxy.c
        design.txt
        monetdb5/modules/mal/remote.c
        tools/monetdbe/monetdbe.c
Branch: monetdbe-proxy
Log Message:

Work in progress: proxified monetdbe_append compiles.


diffs (truncated from 383 to 300 lines):

diff --git a/ctest/tools/monetdbe/example_proxy.c 
b/ctest/tools/monetdbe/example_proxy.c
--- a/ctest/tools/monetdbe/example_proxy.c
+++ b/ctest/tools/monetdbe/example_proxy.c
@@ -33,15 +33,18 @@ main(void)
        if (monetdbe_open(&mdbe, 
"mapi:monetdb://127.0.0.1:50000?database=devdb", &opt))
                error("Failed to open database")
 
-       if ((err = monetdbe_query(mdbe, "SELECT x, y, 1 AS some_int FROM test; 
", &result, NULL)) != NULL)
+       if ((err = monetdbe_query(mdbe, "SELECT * FROM test; ", &result, NULL)) 
!= NULL)
                error(err)
 
+       monetdbe_column* appendable_columns[2];
+
        fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n", 
result->ncols, result->nrows);
        for (int64_t r = 0; r < result->nrows; r++) {
                for (size_t c = 0; c < result->ncols; c++) {
                        monetdbe_column* rcol;
                        if ((err = monetdbe_result_fetch(result, &rcol, c)) != 
NULL)
                                error(err)
+                       appendable_columns[c] = rcol;
                        switch (rcol->type) {
                                case monetdbe_int8_t: {
                                        monetdbe_column_int8_t * col = 
(monetdbe_column_int8_t *) rcol;
@@ -91,6 +94,9 @@ main(void)
                printf("\n");
        }
 
+       if ((err = monetdbe_append(mdbe, "sys", "test", appendable_columns, 2)) 
!= NULL)
+               error(err)
+
        if ((err = monetdbe_cleanup_result(mdbe, result)) != NULL)
                error(err)
 
@@ -110,6 +116,7 @@ main(void)
        if ((err = monetdbe_execute(stmt, &result, NULL)) != NULL)
                error(err)
 
+
        fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n", 
result->ncols, result->nrows);
        for (int64_t r = 0; r < result->nrows; r++) {
                for (size_t c = 0; c < result->ncols; c++) {
diff --git a/design.txt b/design.txt
--- a/design.txt
+++ b/design.txt
@@ -80,6 +80,9 @@ function user.%temp(X1, ..., XN)
     sql.affectedRows(m, c);
 end
 
+function user.%temp2()
+    remote.put()
+
 Register this function and remotely execute it:
 remote.register(conn, user, %temp)
 remote.exec(conn, user, %temp, RB1, ..., RBN)
diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c
--- a/monetdb5/modules/mal/remote.c
+++ b/monetdb5/modules/mal/remote.c
@@ -70,7 +70,6 @@
 #ifdef HAVE_MAPI
 
 
-
 #include "mal_exception.h"
 #include "mal_interpreter.h"
 #include "mal_function.h" /* for printFunction */
@@ -1138,7 +1137,7 @@ static str RMTput(Client cntxt, MalBlkPt
  * The implementation is based on serialisation of the block into a string
  * followed by remote parsing.
  */
-static str RMTregisterInternal(Client cntxt, const char *conn, const char 
*mod, const char *fcn)
+static str RMTregisterInternal(Client cntxt, char** fcn_id, const char *conn, 
const char *mod, const char *fcn)
 {
        str tmp, qry, msg;
        connection c;
@@ -1176,32 +1175,60 @@ static str RMTregisterInternal(Client cn
        if (mhdl)
                mapi_close_handle(mhdl);
 
+       /* get a free, typed identifier for the remote host */
+       char ident[BUFSIZ];
+       tmp = RMTgetId(ident, sym->def, getInstrPtr(sym->def, 0), 0);
+       if (tmp != MAL_SUCCEED) {
+               MT_lock_unset(&c->lock);
+               return tmp;
+       }
+       
+       *fcn_id = GDKmalloc(strlen(ident));
+       if (*fcn_id == NULL) {
+               //TODO: handle error
+       }
+
+       strcpy(*fcn_id, ident);
+
+       Symbol prg;
+
+       if ((prg = newFunction(putName(mod), putName(*fcn_id), FUNCTIONsymbol)) 
== NULL) {
+               // TODO: handle error
+               return createException(MAL, "Remote register", MAL_MALLOC_FAIL);
+       }
+       prg->def = copyMalBlk(sym->def);
+       // TODO: handle if == NULL
+       setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
+
        /* make sure the program is error free */
-       msg = chkProgram(cntxt->usermodule, sym->def);
-       if ( msg == MAL_SUCCEED || sym->def->errors) {
+       msg = chkProgram(cntxt->usermodule, prg->def);
+       if ( msg != MAL_SUCCEED || prg->def->errors) {
                MT_lock_unset(&c->lock);
                throw(MAL, "remote.register",
                                "function '%s.%s' contains syntax or type 
errors",
-                               mod, fcn);
+                               mod, *fcn_id);
        }
 
-       qry = mal2str(sym->def, 0, sym->def->stop);
+       qry = mal2str(prg->def, 0, prg->def->stop);
        TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
        msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
        GDKfree(qry);
        if (mhdl)
                mapi_close_handle(mhdl);
 
+       freeSymbol(prg);
+
        MT_lock_unset(&c->lock);
        return msg;
 }
 
 static str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci) {
+       char *fcn_id = *getArgReference_str(stk, pci, 0);
        const char *conn = *getArgReference_str(stk, pci, 1);
        const char *mod = *getArgReference_str(stk, pci, 2);
        const char *fcn = *getArgReference_str(stk, pci, 3);
        (void)mb;
-       return RMTregisterInternal(cntxt, conn, mod, fcn);
+       return RMTregisterInternal(cntxt, &fcn_id, conn, mod, fcn);
 }
 
 /**
@@ -1615,7 +1642,7 @@ mel_func remote_init_funcs[] = {
  command("remote", "disconnect", RMTdisconnect, false, "disconnects the 
connection pointed to by handle (received from a call to connect()", args(1,2, 
arg("",void),arg("conn",str))),
  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object 
ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
  pattern("remote", "put", RMTput, false, "copies object to the remote site and 
returns its identifier", args(1,3, 
arg("",str),arg("conn",str),argany("object",0))),
- pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at 
the remote site", args(1,4, 
arg("",void),arg("conn",str),arg("mod",str),arg("fcn",str))),
+ pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at 
the remote site", args(1,4, 
arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and 
returns the handle to its result", args(1,4, 
arg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and 
returns the handle to its result", args(1,4, 
vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> 
using the argument list of remote objects and returns the handle to its 
result", args(1,5, 
arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
diff --git a/tools/monetdbe/monetdbe.c b/tools/monetdbe/monetdbe.c
--- a/tools/monetdbe/monetdbe.c
+++ b/tools/monetdbe/monetdbe.c
@@ -1429,6 +1429,7 @@ monetdbe_get_columns_remote(monetdbe_dat
                return mdbe->msg;
        }
 
+       *column_count = result->ncols;
        *column_names = GDKzalloc(sizeof(char*) * result->ncols);
        *column_types = GDKzalloc(sizeof(int) * result->ncols);
 
@@ -1447,12 +1448,7 @@ monetdbe_get_columns_remote(monetdbe_dat
                }
 
                (*column_names)[c] = rcol->name;
-
-               int tpe = monetdbe_type(rcol->type);
-               if (tpe == -1) {
-                       // TODO: handle error
-               }
-               (*column_types)[c] = tpe;
+               (*column_types)[c] = rcol->type;
        }
 
        return mdbe->msg;
@@ -1611,6 +1607,9 @@ monetdbe_append(monetdbe_database dbhdl,
        size_t i, cnt;
        node *n;
 
+       char buf[16] = {0};
+       char* remote_program_name = NULL;
+
 
        if ((mdbe->msg = validate_database_handle(mdbe, 
"monetdbe.monetdbe_append")) != MAL_SUCCEED) {
                return mdbe->msg;
@@ -1618,8 +1617,6 @@ monetdbe_append(monetdbe_database dbhdl,
 
        if ((mdbe->msg = getSQLContext(mdbe->c, NULL, &m, NULL)) != MAL_SUCCEED)
                goto cleanup;
-       if ((mdbe->msg = SQLtrans(m)) != MAL_SUCCEED)
-               goto cleanup;
 
        if (schema == NULL) {
                mdbe->msg = createException(MAL, "monetdbe.monetdbe_append", 
"schema parameter is NULL");
@@ -1640,22 +1637,32 @@ monetdbe_append(monetdbe_database dbhdl,
 
        if (mdbe->mid) {
                // We are going to insert the data into a temporary table which 
is used in the coming remote logic.
-               if (!(t = create_sql_table(m->sa, NULL, tt_table, 0, 
SQL_DECLARED_TABLE, CA_COMMIT, 0))) {
-                       mdbe->msg = createException(SQL, 
"monetdbe.monetdbe_append", "Cannot create temporary table");
-                       goto cleanup;
-               }
 
                size_t actual_column_count;
-               char** column_names;
-               int* column_types;
+               char** actual_column_names;
+               int* actual_column_types;
 
                if ((mdbe->msg = monetdbe_get_columns_remote(
                                mdbe,
                                schema,
                                table,
                                &actual_column_count,
-                               &column_names,
-                               &column_types)) != MAL_SUCCEED) {
+                               &actual_column_names,
+                               &actual_column_types)) != MAL_SUCCEED) {
+                       goto cleanup;
+               }
+
+               if ((mdbe->msg = SQLtrans(m)) != MAL_SUCCEED)
+                       goto cleanup;
+
+               sql_schema* s;
+               if (!(s = find_sql_schema(m->session->tr, "tmp"))) {
+                       // TODO handle error
+                       goto cleanup;
+               }
+
+               if (!(t = sql_trans_create_table(m->session->tr, s, table, 
NULL, tt_table, false, SQL_DECLARED_TABLE, CA_COMMIT, -1, 0))) {
+                       mdbe->msg = createException(SQL, 
"monetdbe.monetdbe_append", "Cannot create temporary table");
                        goto cleanup;
                }
 
@@ -1663,24 +1670,78 @@ monetdbe_append(monetdbe_database dbhdl,
                        // TODO handle error
                }
 
-               for (i = 0; i < column_count && n; i++) {
-                       sql_type *t = SA_ZNEW(m->sa, sql_type);
-                       t->localtype = monetdbe_type(column_types[i]);
+               const char *mod         = "user";
+               remote_program_name     = number2name(
+                                                                               
        buf,
+                                                                               
        sizeof(buf),
+                                                                               
        ++((backend*)  mdbe->c->sqlcontext)->remote);
+
+               Symbol prg      = newFunction(putName(mod), 
putName(remote_program_name), FUNCTIONsymbol); // remote program
+               MalBlkPtr mb    = prg->def;
+               InstrPtr f = getInstrPtr(mb, 0);
+               f->retc = f->argc = 0;
+               f = pushReturn(mb, f, newTmpVariable(mb, TYPE_int));
+               InstrPtr c = newInstruction(mb, sqlRef, mvcRef);
+               c = pushReturn(mb, c, newTmpVariable(mb, TYPE_int));
+               pushInstruction(mb, c);
 
+               for (i = 0; i < column_count; i++) {
+
+                       if (strcmp(actual_column_names[i], input[i]->name) != 0 
|| actual_column_types[i] != (int) input[i]->type ) {
+                               // TODO handle error
+                               goto cleanup;
+                       }
+
+                       sql_type *tpe = SA_ZNEW(m->sa, sql_type);
+                       tpe->localtype = monetdbe_type(actual_column_types[i]);
                        sql_subtype *st = SA_ZNEW(m->sa, sql_subtype);
-                       sql_init_subtype(st, t, 0, 0);
+                       sql_init_subtype(st, tpe, 0, 0);
 
-                       if (!mvc_create_column(m, t, column_names[i], &st)) {
+                       sql_column* col;
+                       if (!(col = mvc_create_column(m, t, 
actual_column_names[i], st))) {
                                mdbe->msg = createException(MAL, 
"monetdbe.monetdbe_append", MAL_MALLOC_FAIL);
                                goto cleanup;
                                // TODO handle error
                        }
+
+                       if (store_funcs.create_col(m->session->tr, col) != 
LOG_OK) {
+                               mdbe->msg = createException(MAL, 
"monetdbe.monetdbe_append", MAL_MALLOC_FAIL);
+                               goto cleanup;
+                               // TODO handle error
+                       }
+
+                       int idx = newVariable(mb, NULL, 0, 
newBatType(tpe->localtype));
+                       f = pushArgument(mb, f, idx);
+
+                       InstrPtr a = newInstruction(mb, sqlRef, appendRef);
+                       setDestVar(a, newTmpVariable(mb, TYPE_any));
+                       a = pushArgument(mb, a, getArg(c, 0));
+                       a = pushStr(mb, a, schema);
+                       a = pushStr(mb, a, table);
+                       a = pushStr(mb, a, actual_column_names[i]);
+                       a = pushArgument(mb, a, idx);
+                       pushInstruction(mb, a);
                }
+
+               InstrPtr r = newInstruction(mb, NULL, NULL);
+               r->barrier= RETURNsymbol;
+               r->retc = r->argc = 0;
+               r = pushReturn(mb, r, getArg(c, 0));
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to