Changeset: e2aa9541880b for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e2aa9541880b
Modified Files:
ctest/tools/monetdbe/example_proxy.c
design.txt
monetdb5/modules/mal/remote.c
tools/monetdbe/monetdbe.c
Branch: monetdbe-proxy
Log Message:
Work in progress: proxified monetdbe_append compiles.
diffs (truncated from 383 to 300 lines):
diff --git a/ctest/tools/monetdbe/example_proxy.c
b/ctest/tools/monetdbe/example_proxy.c
--- a/ctest/tools/monetdbe/example_proxy.c
+++ b/ctest/tools/monetdbe/example_proxy.c
@@ -33,15 +33,18 @@ main(void)
if (monetdbe_open(&mdbe,
"mapi:monetdb://127.0.0.1:50000?database=devdb", &opt))
error("Failed to open database")
- if ((err = monetdbe_query(mdbe, "SELECT x, y, 1 AS some_int FROM test;
", &result, NULL)) != NULL)
+ if ((err = monetdbe_query(mdbe, "SELECT * FROM test; ", &result, NULL))
!= NULL)
error(err)
+ monetdbe_column* appendable_columns[2];
+
fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n",
result->ncols, result->nrows);
for (int64_t r = 0; r < result->nrows; r++) {
for (size_t c = 0; c < result->ncols; c++) {
monetdbe_column* rcol;
if ((err = monetdbe_result_fetch(result, &rcol, c)) !=
NULL)
error(err)
+ appendable_columns[c] = rcol;
switch (rcol->type) {
case monetdbe_int8_t: {
monetdbe_column_int8_t * col =
(monetdbe_column_int8_t *) rcol;
@@ -91,6 +94,9 @@ main(void)
printf("\n");
}
+ if ((err = monetdbe_append(mdbe, "sys", "test", appendable_columns, 2))
!= NULL)
+ error(err)
+
if ((err = monetdbe_cleanup_result(mdbe, result)) != NULL)
error(err)
@@ -110,6 +116,7 @@ main(void)
if ((err = monetdbe_execute(stmt, &result, NULL)) != NULL)
error(err)
+
fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n",
result->ncols, result->nrows);
for (int64_t r = 0; r < result->nrows; r++) {
for (size_t c = 0; c < result->ncols; c++) {
diff --git a/design.txt b/design.txt
--- a/design.txt
+++ b/design.txt
@@ -80,6 +80,9 @@ function user.%temp(X1, ..., XN)
sql.affectedRows(m, c);
end
+function user.%temp2()
+ remote.put()
+
Register this function and remotely execute it:
remote.register(conn, user, %temp)
remote.exec(conn, user, %temp, RB1, ..., RBN)
diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c
--- a/monetdb5/modules/mal/remote.c
+++ b/monetdb5/modules/mal/remote.c
@@ -70,7 +70,6 @@
#ifdef HAVE_MAPI
-
#include "mal_exception.h"
#include "mal_interpreter.h"
#include "mal_function.h" /* for printFunction */
@@ -1138,7 +1137,7 @@ static str RMTput(Client cntxt, MalBlkPt
* The implementation is based on serialisation of the block into a string
* followed by remote parsing.
*/
-static str RMTregisterInternal(Client cntxt, const char *conn, const char
*mod, const char *fcn)
+static str RMTregisterInternal(Client cntxt, char** fcn_id, const char *conn,
const char *mod, const char *fcn)
{
str tmp, qry, msg;
connection c;
@@ -1176,32 +1175,60 @@ static str RMTregisterInternal(Client cn
if (mhdl)
mapi_close_handle(mhdl);
+ /* get a free, typed identifier for the remote host */
+ char ident[BUFSIZ];
+ tmp = RMTgetId(ident, sym->def, getInstrPtr(sym->def, 0), 0);
+ if (tmp != MAL_SUCCEED) {
+ MT_lock_unset(&c->lock);
+ return tmp;
+ }
+
+ *fcn_id = GDKmalloc(strlen(ident));
+ if (*fcn_id == NULL) {
+ //TODO: handle error
+ }
+
+ strcpy(*fcn_id, ident);
+
+ Symbol prg;
+
+ if ((prg = newFunction(putName(mod), putName(*fcn_id), FUNCTIONsymbol))
== NULL) {
+ // TODO: handle error
+ return createException(MAL, "Remote register", MAL_MALLOC_FAIL);
+ }
+ prg->def = copyMalBlk(sym->def);
+ // TODO: handle if == NULL
+ setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
+
/* make sure the program is error free */
- msg = chkProgram(cntxt->usermodule, sym->def);
- if ( msg == MAL_SUCCEED || sym->def->errors) {
+ msg = chkProgram(cntxt->usermodule, prg->def);
+ if ( msg != MAL_SUCCEED || prg->def->errors) {
MT_lock_unset(&c->lock);
throw(MAL, "remote.register",
"function '%s.%s' contains syntax or type
errors",
- mod, fcn);
+ mod, *fcn_id);
}
- qry = mal2str(sym->def, 0, sym->def->stop);
+ qry = mal2str(prg->def, 0, prg->def->stop);
TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
GDKfree(qry);
if (mhdl)
mapi_close_handle(mhdl);
+ freeSymbol(prg);
+
MT_lock_unset(&c->lock);
return msg;
}
static str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci) {
+ char *fcn_id = *getArgReference_str(stk, pci, 0);
const char *conn = *getArgReference_str(stk, pci, 1);
const char *mod = *getArgReference_str(stk, pci, 2);
const char *fcn = *getArgReference_str(stk, pci, 3);
(void)mb;
- return RMTregisterInternal(cntxt, conn, mod, fcn);
+ return RMTregisterInternal(cntxt, &fcn_id, conn, mod, fcn);
}
/**
@@ -1615,7 +1642,7 @@ mel_func remote_init_funcs[] = {
command("remote", "disconnect", RMTdisconnect, false, "disconnects the
connection pointed to by handle (received from a call to connect()", args(1,2,
arg("",void),arg("conn",str))),
pattern("remote", "get", RMTget, false, "retrieves a copy of remote object
ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
pattern("remote", "put", RMTput, false, "copies object to the remote site and
returns its identifier", args(1,3,
arg("",str),arg("conn",str),argany("object",0))),
- pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at
the remote site", args(1,4,
arg("",void),arg("conn",str),arg("mod",str),arg("fcn",str))),
+ pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at
the remote site", args(1,4,
arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and
returns the handle to its result", args(1,4,
arg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and
returns the handle to its result", args(1,4,
vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func>
using the argument list of remote objects and returns the handle to its
result", args(1,5,
arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
diff --git a/tools/monetdbe/monetdbe.c b/tools/monetdbe/monetdbe.c
--- a/tools/monetdbe/monetdbe.c
+++ b/tools/monetdbe/monetdbe.c
@@ -1429,6 +1429,7 @@ monetdbe_get_columns_remote(monetdbe_dat
return mdbe->msg;
}
+ *column_count = result->ncols;
*column_names = GDKzalloc(sizeof(char*) * result->ncols);
*column_types = GDKzalloc(sizeof(int) * result->ncols);
@@ -1447,12 +1448,7 @@ monetdbe_get_columns_remote(monetdbe_dat
}
(*column_names)[c] = rcol->name;
-
- int tpe = monetdbe_type(rcol->type);
- if (tpe == -1) {
- // TODO: handle error
- }
- (*column_types)[c] = tpe;
+ (*column_types)[c] = rcol->type;
}
return mdbe->msg;
@@ -1611,6 +1607,9 @@ monetdbe_append(monetdbe_database dbhdl,
size_t i, cnt;
node *n;
+ char buf[16] = {0};
+ char* remote_program_name = NULL;
+
if ((mdbe->msg = validate_database_handle(mdbe,
"monetdbe.monetdbe_append")) != MAL_SUCCEED) {
return mdbe->msg;
@@ -1618,8 +1617,6 @@ monetdbe_append(monetdbe_database dbhdl,
if ((mdbe->msg = getSQLContext(mdbe->c, NULL, &m, NULL)) != MAL_SUCCEED)
goto cleanup;
- if ((mdbe->msg = SQLtrans(m)) != MAL_SUCCEED)
- goto cleanup;
if (schema == NULL) {
mdbe->msg = createException(MAL, "monetdbe.monetdbe_append",
"schema parameter is NULL");
@@ -1640,22 +1637,32 @@ monetdbe_append(monetdbe_database dbhdl,
if (mdbe->mid) {
// We are going to insert the data into a temporary table which
is used in the coming remote logic.
- if (!(t = create_sql_table(m->sa, NULL, tt_table, 0,
SQL_DECLARED_TABLE, CA_COMMIT, 0))) {
- mdbe->msg = createException(SQL,
"monetdbe.monetdbe_append", "Cannot create temporary table");
- goto cleanup;
- }
size_t actual_column_count;
- char** column_names;
- int* column_types;
+ char** actual_column_names;
+ int* actual_column_types;
if ((mdbe->msg = monetdbe_get_columns_remote(
mdbe,
schema,
table,
&actual_column_count,
- &column_names,
- &column_types)) != MAL_SUCCEED) {
+ &actual_column_names,
+ &actual_column_types)) != MAL_SUCCEED) {
+ goto cleanup;
+ }
+
+ if ((mdbe->msg = SQLtrans(m)) != MAL_SUCCEED)
+ goto cleanup;
+
+ sql_schema* s;
+ if (!(s = find_sql_schema(m->session->tr, "tmp"))) {
+ // TODO handle error
+ goto cleanup;
+ }
+
+ if (!(t = sql_trans_create_table(m->session->tr, s, table,
NULL, tt_table, false, SQL_DECLARED_TABLE, CA_COMMIT, -1, 0))) {
+ mdbe->msg = createException(SQL,
"monetdbe.monetdbe_append", "Cannot create temporary table");
goto cleanup;
}
@@ -1663,24 +1670,78 @@ monetdbe_append(monetdbe_database dbhdl,
// TODO handle error
}
- for (i = 0; i < column_count && n; i++) {
- sql_type *t = SA_ZNEW(m->sa, sql_type);
- t->localtype = monetdbe_type(column_types[i]);
+ const char *mod = "user";
+ remote_program_name = number2name(
+
buf,
+
sizeof(buf),
+
++((backend*) mdbe->c->sqlcontext)->remote);
+
+ Symbol prg = newFunction(putName(mod),
putName(remote_program_name), FUNCTIONsymbol); // remote program
+ MalBlkPtr mb = prg->def;
+ InstrPtr f = getInstrPtr(mb, 0);
+ f->retc = f->argc = 0;
+ f = pushReturn(mb, f, newTmpVariable(mb, TYPE_int));
+ InstrPtr c = newInstruction(mb, sqlRef, mvcRef);
+ c = pushReturn(mb, c, newTmpVariable(mb, TYPE_int));
+ pushInstruction(mb, c);
+ for (i = 0; i < column_count; i++) {
+
+ if (strcmp(actual_column_names[i], input[i]->name) != 0
|| actual_column_types[i] != (int) input[i]->type ) {
+ // TODO handle error
+ goto cleanup;
+ }
+
+ sql_type *tpe = SA_ZNEW(m->sa, sql_type);
+ tpe->localtype = monetdbe_type(actual_column_types[i]);
sql_subtype *st = SA_ZNEW(m->sa, sql_subtype);
- sql_init_subtype(st, t, 0, 0);
+ sql_init_subtype(st, tpe, 0, 0);
- if (!mvc_create_column(m, t, column_names[i], &st)) {
+ sql_column* col;
+ if (!(col = mvc_create_column(m, t,
actual_column_names[i], st))) {
mdbe->msg = createException(MAL,
"monetdbe.monetdbe_append", MAL_MALLOC_FAIL);
goto cleanup;
// TODO handle error
}
+
+ if (store_funcs.create_col(m->session->tr, col) !=
LOG_OK) {
+ mdbe->msg = createException(MAL,
"monetdbe.monetdbe_append", MAL_MALLOC_FAIL);
+ goto cleanup;
+ // TODO handle error
+ }
+
+ int idx = newVariable(mb, NULL, 0,
newBatType(tpe->localtype));
+ f = pushArgument(mb, f, idx);
+
+ InstrPtr a = newInstruction(mb, sqlRef, appendRef);
+ setDestVar(a, newTmpVariable(mb, TYPE_any));
+ a = pushArgument(mb, a, getArg(c, 0));
+ a = pushStr(mb, a, schema);
+ a = pushStr(mb, a, table);
+ a = pushStr(mb, a, actual_column_names[i]);
+ a = pushArgument(mb, a, idx);
+ pushInstruction(mb, a);
}
+
+ InstrPtr r = newInstruction(mb, NULL, NULL);
+ r->barrier= RETURNsymbol;
+ r->retc = r->argc = 0;
+ r = pushReturn(mb, r, getArg(c, 0));
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list