Changeset: d38a839b1d0a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d38a839b1d0a
Modified Files:
monetdb5/extras/pyapi/pyapi.c
monetdb5/extras/pyapi/pyapi.h
monetdb5/extras/pyapi/pyapi.mal
sql/backends/monet5/sql_gencode.c
sql/include/sql_catalog.h
sql/server/rel_psm.c
sql/server/rel_select.c
sql/server/rel_semantic.c
sql/server/rel_updates.c
sql/server/sql_mvc.h
sql/server/sql_parser.h
sql/server/sql_parser.y
sql/server/sql_scan.c
Branch: pythonloader
Log Message:
first stage COPY INTO sometable FROM LOADER somepythonfunction();
diffs (truncated from 368 to 300 lines):
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -418,6 +418,12 @@ PyAPIevalAggr(Client cntxt, MalBlkPtr mb
}
str
+PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ return PyAPIeval(cntxt, mb, stk, pci, 0, 0);
+}
+
+str
PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
return PyAPIeval(cntxt, mb, stk, pci, 1, 1);
diff --git a/monetdb5/extras/pyapi/pyapi.h b/monetdb5/extras/pyapi/pyapi.h
--- a/monetdb5/extras/pyapi/pyapi.h
+++ b/monetdb5/extras/pyapi/pyapi.h
@@ -96,6 +96,7 @@ pyapi_export str PyAPIevalStd(Client cnt
pyapi_export str PyAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
pyapi_export str PyAPIevalStdMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
pyapi_export str PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+pyapi_export str PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
pyapi_export str PyAPIprelude(void *ret);
diff --git a/monetdb5/extras/pyapi/pyapi.mal b/monetdb5/extras/pyapi/pyapi.mal
--- a/monetdb5/extras/pyapi/pyapi.mal
+++ b/monetdb5/extras/pyapi/pyapi.mal
@@ -25,6 +25,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg:
address PyAPIevalAggr
comment "grouped aggregates through Python";
+pattern eval_loader(fptr:ptr,expr:str,arg:any...):any...
+address PyAPIevalLoader
+comment "loader functions through Python";
+
# initializer code
command prelude() :void address PyAPIprelude;
pyapi.prelude();
@@ -44,6 +48,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg:
address PyAPIevalAggr
comment "grouped aggregates through Python";
+pattern eval_loader(fptr:ptr,expr:str,arg:any...):any...
+address PyAPIevalLoader
+comment "loader functions through Python";
+
module pyapimap;
# The generic Python interface
diff --git a/sql/backends/monet5/sql_gencode.c
b/sql/backends/monet5/sql_gencode.c
--- a/sql/backends/monet5/sql_gencode.c
+++ b/sql/backends/monet5/sql_gencode.c
@@ -2135,7 +2135,7 @@ static int
if (f->func->lang == FUNC_LANG_R || f->func->lang ==
FUNC_LANG_PY || f->func->lang == FUNC_LANG_MAP_PY)
q = pushStr(mb, q, f->func->query);
/* first dynamic output of copy* functions */
- if (f->func->type == F_UNION)
+ if (f->func->type == F_UNION || f->func->type ==
F_LOADER)
q = table_func_create_result(mb, q, f->func,
f->res);
if (list_length(s->op1->op4.lval))
tpe = tail_type(s->op1->op4.lval->h->data);
@@ -3038,6 +3038,10 @@ backend_create_py_func(backend *be, sql_
f->mod = "pyapi";
f->imp = "eval_aggr";
break;
+ case F_LOADER:
+ f->mod = "pyapi";
+ f->imp = "eval_loader";
+ break;
case F_PROC: /* no output */
case F_FUNC:
default: /* ie also F_FILT and F_UNION for now */
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -281,6 +281,7 @@ typedef struct sql_arg {
#define F_FILT 4
#define F_UNION 5
#define F_ANALYTIC 6
+#define F_LOADER 7
#define IS_FUNC(f) (f->type == F_FUNC)
#define IS_PROC(f) (f->type == F_PROC)
@@ -288,6 +289,7 @@ typedef struct sql_arg {
#define IS_FILT(f) (f->type == F_FILT)
#define IS_UNION(f) (f->type == F_UNION)
#define IS_ANALYTIC(f) (f->type == F_ANALYTIC)
+#define IS_LOADER(f) (f->type == F_LOADER)
#define FUNC_LANG_INT 0 /* internal */
#define FUNC_LANG_MAL 1 /* create sql external mod.func */
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -699,10 +699,12 @@ rel_create_func(mvc *sql, dlist *qname,
char is_table = (res && res->token == SQL_TABLE);
char is_aggr = (type == F_AGGR);
char is_func = (type != F_PROC);
- char *F = is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE");
+ char is_loader = (type != F_LOADER);
+
+ char *F =
is_loader?"LOADER":(is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE"));
char *KF = type==F_FILT?"FILTER ": type==F_UNION?"UNION ": "";
- assert(res || type == F_PROC || type == F_FILT);
+ assert(res || type == F_PROC || type == F_FILT || type == F_LOADER);
if (is_table)
type = F_UNION;
@@ -784,18 +786,18 @@ rel_create_func(mvc *sql, dlist *qname,
(lang ==
FUNC_LANG_MAP_PY)?"pyapimap":"unknown";
sql->params = NULL;
if (create) {
- f = mvc_create_func(sql, sql->sa, s,
fname, l, restype, type, lang, mod, fname, lang_body, FALSE, vararg);
+ f = mvc_create_func(sql, sql->sa, s,
fname, l, restype, type, lang, mod, fname, lang_body, (type ==
F_LOADER)?TRUE:FALSE, vararg);
} else if (!sf) {
return sql_error(sql, 01, "CREATE %s%s:
R function %s.%s not bound", KF, F, s->base.name, fname );
- } else {
+ } /*else {
sql_func *f = sf->func;
f->mod = _STRDUP("rapi");
f->imp = _STRDUP("eval");
if (res && restype)
f->res = restype;
- f->sql = 0; /* native */
+ f->sql = 0;
f->lang = FUNC_LANG_INT;
- }
+ }*/
} else if (body) {
sql_arg *ra = (restype &&
!is_table)?restype->h->data:NULL;
list *b = NULL;
diff --git a/sql/server/rel_select.c b/sql/server/rel_select.c
--- a/sql/server/rel_select.c
+++ b/sql/server/rel_select.c
@@ -602,7 +602,7 @@ rel_op_(mvc *sql, sql_schema *s, char *f
f = sql_bind_func(sql->sa, s, fname, NULL, NULL, type);
if (f &&
- ((ek.card == card_none && !f->res) ||
+ ((ek.card == card_none && !f->res) ||
(ek.card != card_none && f->res))) {
return exp_op(sql->sa, NULL, f);
} else {
diff --git a/sql/server/rel_semantic.c b/sql/server/rel_semantic.c
--- a/sql/server/rel_semantic.c
+++ b/sql/server/rel_semantic.c
@@ -155,6 +155,7 @@ rel_semantic(mvc *sql, symbol *s)
case SQL_DELETE:
case SQL_COPYFROM:
case SQL_BINCOPYFROM:
+ case SQL_COPYLOADER:
case SQL_COPYTO:
return rel_updates(sql, s);
diff --git a/sql/server/rel_updates.c b/sql/server/rel_updates.c
--- a/sql/server/rel_updates.c
+++ b/sql/server/rel_updates.c
@@ -15,6 +15,7 @@
#include "sql_privileges.h"
#include "rel_optimizer.h"
#include "rel_dump.h"
+#include "rel_psm.h"
#include "sql_symbol.h"
static sql_exp *
@@ -1452,6 +1453,86 @@ bincopyfrom(mvc *sql, dlist *qname, dlis
return res;
}
+
+static sql_rel *
+copyfromloader(mvc *sql, dlist *qname, symbol *fcall)
+{
+ char *sname = qname_schema(qname);
+ char *tname = qname_table(qname);
+
+ sql_schema *s = NULL;
+ sql_table *t = NULL;
+ sql_subtype tpe;
+
+ node *n;
+ sql_rel *res;
+ list *exps, *args = NULL;
+ sql_exp *import;
+ dnode *l = fcall->data.lval->h;
+ char *fname = qname_fname(l->data.lval);
+ char *f_sname = qname_schema(l->data.lval);
+ sql_schema *f_s = sql->session->schema;
+ sql_subfunc *f = NULL;
+
+ if (!copy_allowed(sql, 1)) {
+ (void) sql_error(sql, 02, "COPY INTO: insufficient privileges: "
+ "binary COPY INTO requires database
administrator rights");
+ return NULL;
+ }
+
+ if (sname && !(s=mvc_bind_schema(sql, sname))) {
+ (void) sql_error(sql, 02, "3F000!COPY INTO: no such schema
'%s'", sname);
+ return NULL;
+ }
+ if (!s)
+ s = cur_schema(sql);
+ t = mvc_bind_table(sql, s, tname);
+ if (!t && !sname) {
+ s = tmp_schema(sql);
+ t = mvc_bind_table(sql, s, tname);
+ if (!t)
+ t = stack_find_table(sql, tname);
+ }
+ if (insert_allowed(sql, t, tname, "COPY INTO", "copy into") == NULL) {
+ return NULL;
+ }
+ if (sname) {
+ f_s = mvc_bind_schema(sql, f_sname);
+ if (!f_s) {
+ (void) sql_error(sql, 02, "3F000!COPY INTO: no such
schema '%s'", f_sname);
+ return NULL;
+ }
+ }
+
+ // TODO: handle parameters to bind correct version
+
+ f = sql_bind_func(sql->sa, f_s, fname, NULL, NULL, F_LOADER);
+ if (!f) {
+ (void) sql_error(sql, 02, "3F000!COPY INTO: no such loader
function '%s'", fname);
+ return NULL;
+ }
+ f->res = table_column_types(sql->sa, t);
+
+ sql_find_subtype(&tpe, "varchar", 0, 0);
+// args = append( append( new_exp_list(sql->sa),
+// exp_atom_str(sql->sa, t->s?t->s->base.name:NULL, &tpe)),
+// exp_atom_str(sql->sa, t->base.name, &tpe));
+
+ import = exp_op(sql->sa, args, f);
+ if (!import) {
+ return NULL;
+ }
+
+ exps = new_exp_list(sql->sa);
+ for (n = t->columns.set->h; n; n = n->next) {
+ sql_column *c = n->data;
+ append(exps, exp_column(sql->sa, t->base.name, c->base.name,
&c->type, CARD_MULTI, c->null, 0));
+ }
+ res = rel_table_func(sql->sa, NULL, import, exps, 1);
+ res = rel_insert_table(sql, t, t->base.name, res);
+ return res;
+}
+
static sql_rel *
rel_output(mvc *sql, sql_rel *l, sql_exp *sep, sql_exp *rsep, sql_exp *ssep,
sql_exp *null_string, sql_exp *file)
{
@@ -1614,6 +1695,14 @@ rel_updates(mvc *sql, symbol *s)
sql->type = Q_UPDATE;
}
break;
+ case SQL_COPYLOADER:
+ {
+ dlist *l = s->data.lval;
+
+ ret = copyfromloader(sql, l->h->data.lval,
l->h->next->data.sym);
+ sql->type = Q_UPDATE;
+ }
+ break;
case SQL_COPYTO:
{
dlist *l = s->data.lval;
diff --git a/sql/server/sql_mvc.h b/sql/server/sql_mvc.h
--- a/sql/server/sql_mvc.h
+++ b/sql/server/sql_mvc.h
@@ -37,6 +37,8 @@
#define card_column 2
#define card_set 3 /* some operators require only a set (IN/EXISTS) */
#define card_relation 4
+
+
/* allowed to reduce (in the where and having parts we can reduce) */
/* different query execution modes (emode) */
diff --git a/sql/server/sql_parser.h b/sql/server/sql_parser.h
--- a/sql/server/sql_parser.h
+++ b/sql/server/sql_parser.h
@@ -148,6 +148,7 @@ typedef enum tokens {
SQL_ESCAPE,
SQL_COPYFROM,
SQL_BINCOPYFROM,
+ SQL_COPYLOADER,
SQL_COPYTO,
SQL_EXPORT,
SQL_NEXT,
diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y
--- a/sql/server/sql_parser.y
+++ b/sql/server/sql_parser.y
@@ -583,7 +583,7 @@ SQLCODE SQLERROR UNDER WHENEVER
%token TEMP TEMPORARY STREAM MERGE REMOTE REPLICA
%token<sval> ASC DESC AUTHORIZATION
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list