Changeset: e509a5024a21 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e509a5024a21
Modified Files:
monetdb5/optimizer/opt_remoteQueries.c
sql/backends/monet5/rel_bin.c
sql/backends/monet5/sql_execute.c
sql/backends/monet5/sql_gencode.c
sql/backends/monet5/sql_statement.c
sql/server/rel_distribute.c
sql/server/rel_updates.c
Branch: acticloud
Log Message:
Progress on insertions on remote tables. Send the insert relation to the remote
host and return the number of rows inserted.
diffs (truncated from 661 to 300 lines):
diff --git a/monetdb5/optimizer/opt_remoteQueries.c
b/monetdb5/optimizer/opt_remoteQueries.c
--- a/monetdb5/optimizer/opt_remoteQueries.c
+++ b/monetdb5/optimizer/opt_remoteQueries.c
@@ -298,7 +298,7 @@ OPTremoteQueriesImplementation(Client cn
collectFirst= TRUE;
}
if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef
- && (getFunctionId(p)== resultSetRef ||
+ && (getFunctionId(p)== resultSetRef ||
getFunctionId(p)== affectedRowsRef ||
getFunctionId(p)== rsColumnRef)))
collectFirst= TRUE;
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -577,7 +577,7 @@ exp_bin(backend *be, sql_exp *e, stmt *l
}
if (cond_execution) {
/* var_x = nil; */
- nme = number2name(name, 16, ++sql->label);
+ nme = number2name(name, sizeof(name), ++sql->label);
(void)stmt_var(be, nme, exp_subtype(e), 1, 2);
/* if_barrier ... */
cond_execution = stmt_cond(be, cond_execution, NULL, 0,
0);
@@ -1613,7 +1613,7 @@ rel2bin_table(backend *be, sql_rel *rel,
char name[16], *nme;
sql_rel *fr;
- nme = number2name(name, 16, ++sql->remote);
+ nme = number2name(name, sizeof(name), ++sql->remote);
l = rel2bin_args(be, rel->l, sa_list(sql->sa));
if(!l)
@@ -3549,7 +3549,7 @@ rel2bin_insert(backend *be, sql_rel *rel
}
/* before */
- if(be->cur_append && !be->first_statement_generated) {
+ if (be->cur_append && !be->first_statement_generated) {
for(sql_table *up = t->p ; up ; up = up->p) {
if (!sql_insert_triggers(be, up, updates, 0))
return sql_error(sql, 02, SQLSTATE(27000)
"INSERT INTO: triggers failed for table '%s'", up->base.name);
@@ -3577,19 +3577,28 @@ rel2bin_insert(backend *be, sql_rel *rel
is = stmt_append_idx(be, i, is);
}
- for (n = t->columns.set->h, m = inserts->op4.lval->h;
- n && m; n = n->next, m = m->next) {
-
- stmt *ins = m->data;
- sql_column *c = n->data;
-
- insert = stmt_append_col(be, c, ins, rel->flag&UPD_LOCKED);
- append(l,insert);
+ if (isRemote(t)) {
+ stmt *sub;
+ char name[16], *nme = number2name(name, sizeof(name),
++sql->remote);
+ list *ll = rel2bin_args(be, rel->r, sa_list(sql->sa));
+
+ if (!ll)
+ return NULL;
+ sub = stmt_list(be, ll);
+ insert = stmt_func(be, sub, sa_strdup(sql->sa, nme), rel, 0);
+ } else {
+ for (n = t->columns.set->h, m = inserts->op4.lval->h; n && m; n
= n->next, m = m->next) {
+ stmt *ins = m->data;
+ sql_column *c = n->data;
+
+ insert = stmt_append_col(be, c, ins,
rel->flag&UPD_LOCKED);
+ append(l, insert);
+ }
}
if (!insert)
return NULL;
- if(be->cur_append && !be->first_statement_generated) {
+ if (be->cur_append && !be->first_statement_generated) {
for(sql_table *up = t->p ; up ; up = up->p) {
if (!sql_insert_triggers(be, up, updates, 1))
return sql_error(sql, 02, SQLSTATE(27000)
"INSERT INTO: triggers failed for table '%s'", up->base.name);
@@ -3597,11 +3606,14 @@ rel2bin_insert(backend *be, sql_rel *rel
}
if (!sql_insert_triggers(be, t, updates, 1))
return sql_error(sql, 02, SQLSTATE(27000) "INSERT INTO:
triggers failed for table '%s'", t->base.name);
+
if (ddl) {
ret = ddl;
list_prepend(l, ddl);
} else {
- if (insert->op1->nrcols == 0) {
+ if (isRemote(t)) {
+ s = insert->op2;
+ } else if (insert->op1->nrcols == 0) {
s = stmt_atom_lng(be, 1);
} else {
s = stmt_aggr(be, insert->op1, NULL, NULL,
sql_bind_aggr(sql->sa, sql->session->schema, "count", NULL), 1, 0, 1);
@@ -3609,7 +3621,7 @@ rel2bin_insert(backend *be, sql_rel *rel
ret = s;
}
- if(be->cur_append) //building the total number of rows affected across
all tables
+ if (be->cur_append) //building the total number of rows affected across
all tables
ret->nr = add_to_merge_partitions_accumulator(be, ret->nr);
if (ddl)
diff --git a/sql/backends/monet5/sql_execute.c
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -930,8 +930,8 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
str *sig = getArgReference_str(stk, pci, 4), c = *sig;
backend *be = NULL;
mvc *m = NULL;
- str msg;
- sql_rel *rel;
+ str msg = MAL_SUCCEED;
+ sql_rel *rel = NULL;
list *refs, *ops;
char buf[BUFSIZ];
@@ -939,17 +939,20 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
return msg;
if ((msg = checkSQLContext(cntxt)) != NULL)
return msg;
- SQLtrans(m);
if (!m->sa)
m->sa = sa_create();
if (!m->sa)
return createException(SQL,"RAstatement2",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ SQLtrans(m);
+
/* keep copy of signature and relational expression */
snprintf(buf, BUFSIZ, "%s %s", *sig, *expr);
- if(!stack_push_frame(m, NULL))
- return createException(SQL,"RAstatement2",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ if (!stack_push_frame(m, NULL)) {
+ msg = createException(SQL,"RAstatement2",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ goto cleanup;
+ }
ops = sa_list(m->sa);
while (c && *c && !isspace((unsigned char) *c)) {
char *vnme = c, *tnme;
@@ -968,7 +971,8 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
tnme = sa_strdup(m->sa, tnme);
if (!tnme) {
stack_pop_frame(m);
- return
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ msg =
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ goto cleanup;
}
d = strtol(p, &p, 10);
p++; /* skip , */
@@ -982,14 +986,16 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
* */
if (nr >= 0) {
append(ops, exp_atom_ref(m->sa, nr, &t));
- if(!sql_set_arg(m, nr, a)) {
+ if (!sql_set_arg(m, nr, a)) {
stack_pop_frame(m);
- return
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ msg =
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ goto cleanup;
}
} else {
- if(!stack_push_var(m, vnme+1, &t)) {
+ if (!stack_push_var(m, vnme+1, &t)) {
stack_pop_frame(m);
- return
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ msg =
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ goto cleanup;
}
append(ops, exp_var(m->sa, sa_strdup(m->sa, vnme+1),
&t, m->frame));
}
@@ -1002,9 +1008,19 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
stack_pop_frame(m);
if (rel)
rel = rel_optimizer(m, rel, 0);
- if (!rel || monet5_create_relational_function(m, *mod, *nme, rel, NULL,
ops, 0) < 0)
- throw(SQL, "sql.register", SQLSTATE(42000) "Cannot register
%s", buf);
- rel_destroy(rel);
+ if (!rel) {
+ if (strlen(m->errstr) > 6 && m->errstr[5] == '!')
+ msg = createException(SQL, "RAstatement2", "%s",
m->errstr);
+ else
+ msg = createException(SQL, "RAstatement2",
SQLSTATE(42000) "%s", m->errstr);
+ goto cleanup;
+ }
+ if (monet5_create_relational_function(m, *mod, *nme, rel, NULL, ops, 0)
< 0)
+ msg = createException(SQL,"RAstatement2",SQLSTATE(42000) "Could
not generate monet5 relational function");
+
+cleanup:
+ if (rel)
+ rel_destroy(rel);
sqlcleanup(m, 0);
return msg;
}
diff --git a/sql/backends/monet5/sql_gencode.c
b/sql/backends/monet5/sql_gencode.c
--- a/sql/backends/monet5/sql_gencode.c
+++ b/sql/backends/monet5/sql_gencode.c
@@ -33,7 +33,6 @@
#include "sql_scenario.h"
#include "sql_mvc.h"
#include "sql_qc.h"
-#include "sql_optimizer.h"
#include "mal_namespace.h"
#include "opt_prelude.h"
#include "querylog.h"
@@ -127,17 +126,23 @@ relational_func_create_result(mvc *sql,
if (q == NULL)
return NULL;
- if (is_topn(r->op))
- r = r->l;
- if (!is_project(r->op))
- r = rel_project(sql->sa, r, rel_projections(sql, r, NULL, 1,
1));
+
q->argc = q->retc = 0;
- for (i = 0, n = r->exps->h; n; n = n->next, i++) {
- sql_exp *e = n->data;
- int type = exp_subtype(e)->type->localtype;
+ if (is_modify(r->op)) {
+ q = pushReturn(mb, q, newTmpVariable(mb, TYPE_lng)); /* number
of rows affected */
+ } else {
+ if (is_topn(r->op))
+ r = r->l;
+ if (!is_project(r->op))
+ r = rel_project(sql->sa, r, rel_projections(sql, r,
NULL, 1, 1));
- type = newBatType(type);
- q = pushReturn(mb, q, newTmpVariable(mb, type));
+ for (i = 0, n = r->exps->h; n; n = n->next, i++) {
+ sql_exp *e = n->data;
+ int type = exp_subtype(e)->type->localtype;
+
+ type = newBatType(type);
+ q = pushReturn(mb, q, newTmpVariable(mb, type));
+ }
}
return q;
}
@@ -279,31 +284,36 @@ static int
Symbol backup = NULL;
const char *local_tbl = prp->value;
node *n;
- int i, q, v;
- int *lret, *rret;
+ prop *pl;
+ int i, q, v, lrow_count = 0, rrow_count = 0, *lret, *rret;
char *lname;
sql_rel *r = rel;
- if(local_tbl == NULL)
+ if (local_tbl == NULL)
return -1;
-
- lname = GDKstrdup(name);
- if(lname == NULL)
+ if (!(lname = GDKstrdup(name)))
return -1;
if (is_topn(r->op))
r = r->l;
+ if (is_modify(r->op)) {
+ r = r->r; /* what will be inserted/updated/deleted */
+ if ((pl = find_prop(rel->p, PROP_REMOTE))) /* remove the remote
tag */
+ rel->p = prop_remove(rel->p, pl);
+ }
if (!is_project(r->op))
r = rel_project(m->sa, r, rel_projections(m, r, NULL, 1, 1));
- lret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
- if(lret == NULL) {
- GDKfree(lname);
- return -1;
- }
- rret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
- if(rret == NULL) {
- GDKfree(lname);
- return -1;
+ if (!is_modify(rel->op)) {
+ lret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
+ if(lret == NULL) {
+ GDKfree(lname);
+ return -1;
+ }
+ rret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
+ if(rret == NULL) {
+ GDKfree(lname);
+ return -1;
+ }
}
/* create stub */
@@ -336,7 +346,7 @@ static int
const char *nme =
(op->op3)?op->op3->op4.aval->data.val.sval:op->cname;
char buf[64];
- snprintf(buf,64,"A%s",nme);
+ snprintf(buf, sizeof(buf),"A%s",nme);
varid = newVariable(curBlk, buf,strlen(buf), type);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list