Changeset: e509a5024a21 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e509a5024a21
Modified Files:
        monetdb5/optimizer/opt_remoteQueries.c
        sql/backends/monet5/rel_bin.c
        sql/backends/monet5/sql_execute.c
        sql/backends/monet5/sql_gencode.c
        sql/backends/monet5/sql_statement.c
        sql/server/rel_distribute.c
        sql/server/rel_updates.c
Branch: acticloud
Log Message:

Progress on insertions on remote tables. Send the insert relation to the remote 
host and return the number of rows inserted.


diffs (truncated from 661 to 300 lines):

diff --git a/monetdb5/optimizer/opt_remoteQueries.c 
b/monetdb5/optimizer/opt_remoteQueries.c
--- a/monetdb5/optimizer/opt_remoteQueries.c
+++ b/monetdb5/optimizer/opt_remoteQueries.c
@@ -298,7 +298,7 @@ OPTremoteQueriesImplementation(Client cn
                                        collectFirst= TRUE;
                        }
                        if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef
-                           && (getFunctionId(p)== resultSetRef ||
+                           && (getFunctionId(p)== resultSetRef || 
getFunctionId(p)== affectedRowsRef ||
                                getFunctionId(p)== rsColumnRef)))
                                 collectFirst= TRUE;
 
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -577,7 +577,7 @@ exp_bin(backend *be, sql_exp *e, stmt *l
                }
                if (cond_execution) {
                        /* var_x = nil; */
-                       nme = number2name(name, 16, ++sql->label);
+                       nme = number2name(name, sizeof(name), ++sql->label);
                        (void)stmt_var(be, nme, exp_subtype(e), 1, 2);
                        /* if_barrier ... */
                        cond_execution = stmt_cond(be, cond_execution, NULL, 0, 
0);
@@ -1613,7 +1613,7 @@ rel2bin_table(backend *be, sql_rel *rel,
                char name[16], *nme;
                sql_rel *fr;
 
-               nme = number2name(name, 16, ++sql->remote);
+               nme = number2name(name, sizeof(name), ++sql->remote);
 
                l = rel2bin_args(be, rel->l, sa_list(sql->sa));
                if(!l)
@@ -3549,7 +3549,7 @@ rel2bin_insert(backend *be, sql_rel *rel
        }
 
 /* before */
-       if(be->cur_append && !be->first_statement_generated) {
+       if (be->cur_append && !be->first_statement_generated) {
                for(sql_table *up = t->p ; up ; up = up->p) {
                        if (!sql_insert_triggers(be, up, updates, 0))
                                return sql_error(sql, 02, SQLSTATE(27000) 
"INSERT INTO: triggers failed for table '%s'", up->base.name);
@@ -3577,19 +3577,28 @@ rel2bin_insert(backend *be, sql_rel *rel
                        is = stmt_append_idx(be, i, is);
        }
 
-       for (n = t->columns.set->h, m = inserts->op4.lval->h; 
-               n && m; n = n->next, m = m->next) {
-
-               stmt *ins = m->data;
-               sql_column *c = n->data;
-
-               insert = stmt_append_col(be, c, ins, rel->flag&UPD_LOCKED);
-               append(l,insert);
+       if (isRemote(t)) {
+               stmt *sub;
+               char name[16], *nme = number2name(name, sizeof(name), 
++sql->remote);
+               list *ll = rel2bin_args(be, rel->r, sa_list(sql->sa));
+
+               if (!ll)
+                       return NULL;
+               sub = stmt_list(be, ll);
+               insert = stmt_func(be, sub, sa_strdup(sql->sa, nme), rel, 0);
+       } else {
+               for (n = t->columns.set->h, m = inserts->op4.lval->h; n && m; n 
= n->next, m = m->next) {
+                       stmt *ins = m->data;
+                       sql_column *c = n->data;
+
+                       insert = stmt_append_col(be, c, ins, 
rel->flag&UPD_LOCKED);
+                       append(l, insert);
+               }
        }
        if (!insert)
                return NULL;
 
-       if(be->cur_append && !be->first_statement_generated) {
+       if (be->cur_append && !be->first_statement_generated) {
                for(sql_table *up = t->p ; up ; up = up->p) {
                        if (!sql_insert_triggers(be, up, updates, 1))
                                return sql_error(sql, 02, SQLSTATE(27000) 
"INSERT INTO: triggers failed for table '%s'", up->base.name);
@@ -3597,11 +3606,14 @@ rel2bin_insert(backend *be, sql_rel *rel
        }
        if (!sql_insert_triggers(be, t, updates, 1)) 
                return sql_error(sql, 02, SQLSTATE(27000) "INSERT INTO: 
triggers failed for table '%s'", t->base.name);
+
        if (ddl) {
                ret = ddl;
                list_prepend(l, ddl);
        } else {
-               if (insert->op1->nrcols == 0) {
+               if (isRemote(t)) {
+                       s = insert->op2;
+               } else if (insert->op1->nrcols == 0) {
                        s = stmt_atom_lng(be, 1);
                } else {
                        s = stmt_aggr(be, insert->op1, NULL, NULL, 
sql_bind_aggr(sql->sa, sql->session->schema, "count", NULL), 1, 0, 1);
@@ -3609,7 +3621,7 @@ rel2bin_insert(backend *be, sql_rel *rel
                ret = s;
        }
 
-       if(be->cur_append) //building the total number of rows affected across 
all tables
+       if (be->cur_append) //building the total number of rows affected across 
all tables
                ret->nr = add_to_merge_partitions_accumulator(be, ret->nr);
 
        if (ddl)
diff --git a/sql/backends/monet5/sql_execute.c 
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -930,8 +930,8 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
        str *sig = getArgReference_str(stk, pci, 4), c = *sig;
        backend *be = NULL;
        mvc *m = NULL;
-       str msg;
-       sql_rel *rel;
+       str msg = MAL_SUCCEED;
+       sql_rel *rel = NULL;
        list *refs, *ops;
        char buf[BUFSIZ];
 
@@ -939,17 +939,20 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
                return msg;
        if ((msg = checkSQLContext(cntxt)) != NULL)
                return msg;
-       SQLtrans(m);
        if (!m->sa)
                m->sa = sa_create();
        if (!m->sa)
                return createException(SQL,"RAstatement2",SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
 
+       SQLtrans(m);
+
        /* keep copy of signature and relational expression */
        snprintf(buf, BUFSIZ, "%s %s", *sig, *expr);
 
-       if(!stack_push_frame(m, NULL))
-               return createException(SQL,"RAstatement2",SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
+       if (!stack_push_frame(m, NULL)) {
+               msg = createException(SQL,"RAstatement2",SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
+               goto cleanup;
+       }
        ops = sa_list(m->sa);
        while (c && *c && !isspace((unsigned char) *c)) {
                char *vnme = c, *tnme;
@@ -968,7 +971,8 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
                tnme = sa_strdup(m->sa, tnme);
                if (!tnme) {
                        stack_pop_frame(m);
-                       return 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                       msg = 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                       goto cleanup;
                }
                d = strtol(p, &p, 10);
                p++; /* skip , */
@@ -982,14 +986,16 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
                 * */
                if (nr >= 0) { 
                        append(ops, exp_atom_ref(m->sa, nr, &t));
-                       if(!sql_set_arg(m, nr, a)) {
+                       if (!sql_set_arg(m, nr, a)) {
                                stack_pop_frame(m);
-                               return 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                               msg = 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                               goto cleanup;
                        }
                } else {
-                       if(!stack_push_var(m, vnme+1, &t)) {
+                       if (!stack_push_var(m, vnme+1, &t)) {
                                stack_pop_frame(m);
-                               return 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                               msg = 
createException(SQL,"RAstatement2",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+                               goto cleanup;
                        }
                        append(ops, exp_var(m->sa, sa_strdup(m->sa, vnme+1), 
&t, m->frame));
                }
@@ -1002,9 +1008,19 @@ RAstatement2(Client cntxt, MalBlkPtr mb,
        stack_pop_frame(m);
        if (rel)
                rel = rel_optimizer(m, rel, 0);
-       if (!rel || monet5_create_relational_function(m, *mod, *nme, rel, NULL, 
ops, 0) < 0)
-               throw(SQL, "sql.register", SQLSTATE(42000) "Cannot register 
%s", buf);
-       rel_destroy(rel);
+       if (!rel) {
+               if (strlen(m->errstr) > 6 && m->errstr[5] == '!')
+                       msg = createException(SQL, "RAstatement2", "%s", 
m->errstr);
+               else
+                       msg = createException(SQL, "RAstatement2", 
SQLSTATE(42000) "%s", m->errstr);
+               goto cleanup;
+       }
+       if (monet5_create_relational_function(m, *mod, *nme, rel, NULL, ops, 0) 
< 0)
+               msg = createException(SQL,"RAstatement2",SQLSTATE(42000) "Could 
not generate monet5 relational function");
+
+cleanup:
+       if (rel)
+               rel_destroy(rel);
        sqlcleanup(m, 0);
        return msg;
 }
diff --git a/sql/backends/monet5/sql_gencode.c 
b/sql/backends/monet5/sql_gencode.c
--- a/sql/backends/monet5/sql_gencode.c
+++ b/sql/backends/monet5/sql_gencode.c
@@ -33,7 +33,6 @@
 #include "sql_scenario.h"
 #include "sql_mvc.h"
 #include "sql_qc.h"
-#include "sql_optimizer.h"
 #include "mal_namespace.h"
 #include "opt_prelude.h"
 #include "querylog.h"
@@ -127,17 +126,23 @@ relational_func_create_result(mvc *sql, 
 
        if (q == NULL)
                return NULL;
-       if (is_topn(r->op))
-               r = r->l;
-       if (!is_project(r->op))
-               r = rel_project(sql->sa, r, rel_projections(sql, r, NULL, 1, 
1));
+
        q->argc = q->retc = 0;
-       for (i = 0, n = r->exps->h; n; n = n->next, i++) {
-               sql_exp *e = n->data;
-               int type = exp_subtype(e)->type->localtype;
+       if (is_modify(r->op)) {
+               q = pushReturn(mb, q, newTmpVariable(mb, TYPE_lng)); /* number 
of rows affected */
+       } else {
+               if (is_topn(r->op))
+                       r = r->l;
+               if (!is_project(r->op))
+                       r = rel_project(sql->sa, r, rel_projections(sql, r, 
NULL, 1, 1));
 
-               type = newBatType(type);
-               q = pushReturn(mb, q, newTmpVariable(mb, type));
+               for (i = 0, n = r->exps->h; n; n = n->next, i++) {
+                       sql_exp *e = n->data;
+                       int type = exp_subtype(e)->type->localtype;
+
+                       type = newBatType(type);
+                       q = pushReturn(mb, q, newTmpVariable(mb, type));
+               }
        }
        return q;
 }
@@ -279,31 +284,36 @@ static int
        Symbol backup = NULL;
        const char *local_tbl = prp->value;
        node *n;
-       int i, q, v;
-       int *lret, *rret;
+       prop *pl;
+       int i, q, v, lrow_count = 0, rrow_count = 0, *lret, *rret;
        char *lname;
        sql_rel *r = rel;
 
-       if(local_tbl == NULL)
+       if (local_tbl == NULL)
                return -1;
-
-       lname = GDKstrdup(name);
-       if(lname == NULL)
+       if (!(lname = GDKstrdup(name)))
                return -1;
 
        if (is_topn(r->op))
                r = r->l;
+       if (is_modify(r->op)) {
+               r = r->r; /* what will be inserted/updated/deleted */
+               if ((pl = find_prop(rel->p, PROP_REMOTE))) /* remove the remote 
tag */
+                       rel->p = prop_remove(rel->p, pl);
+       }
        if (!is_project(r->op))
                r = rel_project(m->sa, r, rel_projections(m, r, NULL, 1, 1));
-       lret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
-       if(lret == NULL) {
-               GDKfree(lname);
-               return -1;
-       }
-       rret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
-       if(rret == NULL) {
-               GDKfree(lname);
-               return -1;
+       if (!is_modify(rel->op)) {
+               lret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
+               if(lret == NULL) {
+                       GDKfree(lname);
+                       return -1;
+               }
+               rret = SA_NEW_ARRAY(m->sa, int, list_length(r->exps));
+               if(rret == NULL) {
+                       GDKfree(lname);
+                       return -1;
+               }
        }
 
        /* create stub */
@@ -336,7 +346,7 @@ static int
                        const char *nme = 
(op->op3)?op->op3->op4.aval->data.val.sval:op->cname;
                        char buf[64];
 
-                       snprintf(buf,64,"A%s",nme);
+                       snprintf(buf, sizeof(buf),"A%s",nme);
                        varid = newVariable(curBlk, buf,strlen(buf), type);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to