Changeset: c020080e066b for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c020080e066b
Modified Files:
sql/backends/monet5/rel_bin.c
sql/include/sql_relation.h
sql/rel.txt
sql/server/rel_distribute.c
sql/server/rel_dump.c
sql/server/rel_exp.c
sql/server/rel_exp.h
sql/server/rel_optimizer.c
sql/server/rel_prop.c
sql/server/rel_prop.h
sql/storage/sql_catalog.c
sql/test/merge-partitions/Tests/mergepart06.sql
sql/test/merge-partitions/Tests/mergepart09.sql
Branch: merge-partitions
Log Message:
Added PSM_EXCEPTION expression and DDL_DISTRIBUTE, with validation in insert
statements.
This proper handles insert/update/delete queries distribution across partitions.
diffs (truncated from 696 to 300 lines):
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -423,6 +423,9 @@ exp_bin(backend *be, sql_exp *e, stmt *l
if (is_modify(rel->op) || is_ddl(rel->op))
return r;
return stmt_table(be, r, 1);
+ } else if (e->flag & PSM_EXCEPTION) {
+ stmt *cond = exp_bin(be, e->l, left, right, grp, cnt,
ext, sel);
+ return stmt_exception(be, cond, (const char *) e->r, 0);
}
break;
case e_atom: {
@@ -2147,7 +2150,7 @@ rel_rename(backend *be, sql_rel *rel, st
}
static stmt *
-rel2bin_union(backend *be, sql_rel *rel, list *refs, int* type)
+rel2bin_union(backend *be, sql_rel *rel, list *refs)
{
mvc *sql = be->mvc;
list *l;
@@ -2163,32 +2166,21 @@ rel2bin_union(backend *be, sql_rel *rel,
/* construct relation */
l = sa_list(sql->sa);
-
- if(find_prop(rel->p, PROP_DISTRIBUTE)) {
- if(left) //when distribution a delete/update/insert/truncate
just append both sub-relations
- list_append(l, left);
- if(right)
- list_append(l, right);
- *type = Q_UPDATE;
- } else {
- for( n = left->op4.lval->h, m = right->op4.lval->h; n && m;
- n = n->next, m = m->next ) {
- stmt *c1 = n->data;
- stmt *c2 = m->data;
- const char *rnme = table_name(sql->sa, c1);
- const char *nme = column_name(sql->sa, c1);
- stmt *s;
-
- s = stmt_append(be, create_const_column(be, c1), c2);
- s = stmt_alias(be, s, rnme, nme);
- list_append(l, s);
- }
- *type = Q_TABLE;
+ for( n = left->op4.lval->h, m = right->op4.lval->h; n && m;
+ n = n->next, m = m->next ) {
+ stmt *c1 = n->data;
+ stmt *c2 = m->data;
+ const char *rnme = table_name(sql->sa, c1);
+ const char *nme = column_name(sql->sa, c1);
+ stmt *s;
+
+ s = stmt_append(be, create_const_column(be, c1), c2);
+ s = stmt_alias(be, s, rnme, nme);
+ list_append(l, s);
}
sub = stmt_list(be, l);
- if(*type == Q_TABLE)
- sub = rel_rename(be, rel, sub);
+ sub = rel_rename(be, rel, sub);
if (need_distinct(rel))
sub = rel2bin_distinct(be, sub, NULL);
return sub;
@@ -3341,9 +3333,6 @@ rel2bin_insert(backend *be, sql_rel *rel
tr = rel->l;
}
- if(find_prop(rel->p, PROP_DISTRIBUTE) && be->cur_append == 0) /* create
BAT to hold the sum of affected rows */
- create_merge_partitions_accumulator(be);
-
if (tr->op == op_basetable) {
t = tr->l;
} else {
@@ -4598,9 +4587,6 @@ rel2bin_delete(backend *be, sql_rel *rel
else
assert(0/*ddl statement*/);
- if(find_prop(rel->p, PROP_DISTRIBUTE) && be->cur_append == 0) /* create
BAT to hold the sum of affected rows */
- create_merge_partitions_accumulator(be);
-
if (rel->r) { /* first construct the deletes relation */
rows = subrel_bin(be, rel->r, refs);
if (!rows)
@@ -4796,9 +4782,6 @@ rel2bin_truncate(backend *be, sql_rel *r
else
assert(0/*ddl statement*/);
- if(find_prop(rel->p, PROP_DISTRIBUTE) && be->cur_append == 0) /* create
BAT to hold the sum of affected rows */
- create_merge_partitions_accumulator(be);
-
n = rel->exps->h;
restart_sequences = E_ATOM_INT(n->data);
cascade = E_ATOM_INT(n->next->data);
@@ -4885,6 +4868,28 @@ rel2bin_psm(backend *be, sql_rel *rel)
}
static stmt *
+rel2bin_distribute(backend *be, sql_rel *rel, list *refs)
+{
+ stmt *l = NULL, *r = NULL;
+ node *n = NULL;
+ sql_exp *except = NULL;
+
+ if(be->cur_append == 0) /* create affected rows accumulator */
+ create_merge_partitions_accumulator(be);
+
+ if (rel->l) /* first construct the sub relation */
+ l = subrel_bin(be, rel->l, refs);
+ if (rel->r) /* first construct the sub relation */
+ r = subrel_bin(be, rel->r, refs);
+
+ if(rel->exps && list_length(rel->exps) == 1) {
+ n = rel->exps->h;
+ except = n->data;
+ }
+ return exp_bin(be, except, l, r, NULL, NULL, NULL, NULL);
+}
+
+static stmt *
rel2bin_seq(backend *be, sql_rel *rel, list *refs)
{
mvc *sql = be->mvc;
@@ -5023,6 +5028,9 @@ rel2bin_ddl(backend *be, sql_rel *rel, l
s = rel2bin_list(be, rel, refs);
} else if (rel->flag == DDL_PSM) {
s = rel2bin_psm(be, rel);
+ } else if (rel->flag == DDL_DISTRIBUTE) {
+ s = rel2bin_distribute(be, rel, refs);
+ sql->type = Q_UPDATE;
} else if (rel->flag <= DDL_ALTER_SEQ) {
s = rel2bin_seq(be, rel, refs);
sql->type = Q_SCHEMA;
@@ -5050,7 +5058,6 @@ subrel_bin(backend *be, sql_rel *rel, li
{
mvc *sql = be->mvc;
stmt *s = NULL;
- int type = Q_TABLE;
if (THRhighwater())
return NULL;
@@ -5087,8 +5094,8 @@ subrel_bin(backend *be, sql_rel *rel, li
sql->type = Q_TABLE;
break;
case op_union:
- s = rel2bin_union(be, rel, refs, &type);
- sql->type = type;
+ s = rel2bin_union(be, rel, refs);
+ sql->type = Q_TABLE;
break;
case op_except:
s = rel2bin_except(be, rel, refs);
@@ -5251,6 +5258,8 @@ exp_deps(sql_allocator *sa, sql_exp *e,
} else if (e->flag & PSM_REL) {
sql_rel *rel = e->l;
rel_deps(sa, rel, refs, l);
+ } else if (e->flag & PSM_EXCEPTION) {
+ return exps_deps(sa, e->l, refs, l);
}
case e_atom:
case e_column:
@@ -5380,7 +5389,7 @@ rel_deps(sql_allocator *sa, sql_rel *r,
if (r->flag == DDL_OUTPUT) {
if (r->l)
return rel_deps(sa, r->l, refs, l);
- } else if (r->flag <= DDL_LIST) {
+ } else if (r->flag <= DDL_LIST || r->flag == DDL_DISTRIBUTE) {
if (r->l)
return rel_deps(sa, r->l, refs, l);
if (r->r)
diff --git a/sql/include/sql_relation.h b/sql/include/sql_relation.h
--- a/sql/include/sql_relation.h
+++ b/sql/include/sql_relation.h
@@ -81,6 +81,7 @@ typedef struct expression {
#define PSM_WHILE 8
#define PSM_IF 16
#define PSM_REL 32
+#define PSM_EXCEPTION 64
#define SET_PSM_LEVEL(level) (level<<8)
#define GET_PSM_LEVEL(level) (level>>8)
@@ -90,6 +91,7 @@ typedef struct expression {
#define DDL_OUTPUT 1
#define DDL_LIST 2
#define DDL_PSM 3
+#define DDL_DISTRIBUTE 4
#define DDL_CREATE_SEQ 5
#define DDL_ALTER_SEQ 6
diff --git a/sql/rel.txt b/sql/rel.txt
--- a/sql/rel.txt
+++ b/sql/rel.txt
@@ -23,11 +23,11 @@ TABLE (card MULTI)
*/
DDL (card 0!, top of the tree always)
-> flags /* OUTPUT, TRANSACTION-types, CREATE/DROP/ALTER* */
- -> exps For 'OUTPUT' is list of output options
+ -> exps For 'OUTPUT' is list of output options, for DISTRIBUTE
has a single exception expression
For 'transactions' simple flags
for CREATE etc full sql string.
- -> l only used for 'OUTPUT' relation to output
- -> r (only for DDL_LIST)
+ -> l used in DDL_OUTPUT as the relation to output and
DDL_DISTRIBUTE as a DDL_LIST
+ -> r used in DDL_LIST and DDL_DISTRIBUTE
SELECT (card MULTI or same ...?)
-> exps selection expressions (ie all e_cmp)
@@ -160,5 +160,5 @@ e_psm
->r loop_exp_list
psm_return ->l return_exp
psm_rel ->l relation
- psm_exception ->l column_exp
- ->r cmp_exp
+ psm_exception ->l cond_exp
+ ->r error_string
diff --git a/sql/server/rel_distribute.c b/sql/server/rel_distribute.c
--- a/sql/server/rel_distribute.c
+++ b/sql/server/rel_distribute.c
@@ -114,8 +114,10 @@ exp_replica(mvc *sql, sql_exp *e, char *
e->f = exps_replica(sql, e->f, uri);
return e;
}
- if (e->flag & PSM_REL)
+ if (e->flag & PSM_REL)
e->l = replica(sql, e->l, uri);
+ if (e->flag & PSM_EXCEPTION)
+ e->l = exp_replica(sql, e->l, uri);
return e;
}
@@ -205,7 +207,7 @@ replica(mvc *sql, sql_rel *rel, char *ur
rel->l = replica(sql, rel->l, uri);
break;
case op_ddl:
- if (rel->flag == DDL_PSM && rel->exps)
+ if ((rel->flag == DDL_PSM || rel->flag == DDL_DISTRIBUTE) &&
rel->exps)
rel->exps = exps_replica(sql, rel->exps, uri);
rel->l = replica(sql, rel->l, uri);
if (rel->r)
@@ -240,8 +242,10 @@ exp_distribute(mvc *sql, sql_exp *e)
e->f = exps_distribute(sql, e->f);
return e;
}
- if (e->flag & PSM_REL)
+ if (e->flag & PSM_REL)
e->l = distribute(sql, e->l);
+ if (e->flag & PSM_EXCEPTION)
+ e->l = exp_distribute(sql, e->l);
return e;
}
@@ -339,7 +343,7 @@ distribute(mvc *sql, sql_rel *rel)
}
break;
case op_ddl:
- if (rel->flag == DDL_PSM && rel->exps)
+ if ((rel->flag == DDL_PSM || rel->flag == DDL_DISTRIBUTE) &&
rel->exps)
rel->exps = exps_distribute(sql, rel->exps);
rel->l = distribute(sql, rel->l);
if (rel->r)
@@ -374,8 +378,10 @@ exp_remote_func(mvc *sql, sql_exp *e)
e->f = exps_remote_func(sql, e->f);
return e;
}
- if (e->flag & PSM_REL)
+ if (e->flag & PSM_REL)
e->l = rel_remote_func(sql, e->l);
+ if (e->flag & PSM_EXCEPTION)
+ e->l = exp_remote_func(sql, e->l);
return e;
}
@@ -424,7 +430,7 @@ rel_remote_func(mvc *sql, sql_rel *rel)
rel->l = rel_remote_func(sql, rel->l);
break;
case op_ddl:
- if (rel->flag == DDL_PSM && rel->exps)
+ if ((rel->flag == DDL_PSM || rel->flag == DDL_DISTRIBUTE) &&
rel->exps)
rel->exps = exps_remote_func(sql, rel->exps);
rel->l = rel_remote_func(sql, rel->l);
if (rel->r)
diff --git a/sql/server/rel_dump.c b/sql/server/rel_dump.c
--- a/sql/server/rel_dump.c
+++ b/sql/server/rel_dump.c
@@ -95,6 +95,10 @@ exp_print(mvc *sql, stream *fout, sql_ex
exps_print(sql, fout, e->f, depth, refs, alias,
0);
} else if (e->flag & PSM_REL) {
rel_print_(sql, fout, e->l, depth+1, refs, 1);
+ } else if (e->flag & PSM_EXCEPTION) {
+ mnstr_printf(fout, "except ");
+ exp_print(sql, fout, e->l, depth, refs, 0, 0);
+ mnstr_printf(fout, " error %s", (const char *) e->r);
}
break;
}
@@ -378,7 +382,7 @@ rel_print_(mvc *sql, stream *fout, sql_
rel_print_(sql, fout, rel->l, depth+1, refs, decorate);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list