Changeset: 637ab3fc435a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/637ab3fc435a
Modified Files:
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_copyinto.h
Branch: directappend
Log Message:

Factor stream handling out of mvc_import_table_wrap


diffs (260 lines):

diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -2972,99 +2972,68 @@ bat2return(MalStkPtr stk, InstrPtr pci, 
 static char fwftsep[2] = {STREAM_FWF_FIELD_SEP, '\0'};
 static char fwfrsep[2] = {STREAM_FWF_RECORD_SEP, '\0'};
 
-/* str mvc_import_table_wrap(int *res, sql_table **t, unsigned char* *T, 
unsigned char* *R, unsigned char* *S, unsigned char* *N, str *fname, lng *sz, 
lng *offset, int *besteffort, str *fixed_width, int *onclient, int *escape); */
-str
-mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+static str
+setup_import_table_stream(Client cntxt, struct csv_parameters *csv_parms, 
bstream **bs, bool *from_stdin)
 {
-       backend *be;
-       BAT **b = NULL;
-       ssize_t len = 0;
-       sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0);
-       struct csv_parameters csv_parms = {
-               .tsep = *getArgReference_str(stk, pci, pci->retc + 1),
-               .rsep = *getArgReference_str(stk, pci, pci->retc + 2),
-               .ssep = *getArgReference_str(stk, pci, pci->retc + 3),
-               .ns = *getArgReference_str(stk, pci, pci->retc + 4),
-               .nr = *getArgReference_lng(stk, pci, pci->retc + 6),
-               .offset = *getArgReference_lng(stk, pci, pci->retc + 7),
-               .best = *getArgReference_int(stk, pci, pci->retc + 8),
-               .escape = *getArgReference_int(stk, pci, pci->retc + 11),
-       };
-
-       const char *fname = *getArgReference_str(stk, pci, pci->retc + 5);
-       bool append_directly = false;
-       if (csv_parms.best >= 100) {
-               // this matches the temporary ugliness in rel2bin_insert
-               csv_parms.best -= 100;
-               append_directly = true;
-       }
-       char *fixed_widths = *getArgReference_str(stk, pci, pci->retc + 9);
-       int onclient = *getArgReference_int(stk, pci, pci->retc + 10);
        str msg = MAL_SUCCEED;
-       bstream *bstream_to_destroy;
-       bstream *s;
-       bool from_stdin;
-       stream *ss;
-
-       (void) mb;              /* NOT USED */
-       if ((msg = checkSQLContext(cntxt)) != NULL)
-               return msg;
-       if (onclient && !cntxt->filetrans)
+       backend *be = cntxt->sqlcontext;
+       stream *s;
+
+       if (csv_parms->onclient && !cntxt->filetrans)
                throw(MAL, "sql.copy_from", SQLSTATE(42000) "Cannot transfer 
files from client");
 
-       be = cntxt->sqlcontext;
        /* The CSV parser expects ssep to have the value 0 if the user does not
         * specify a quotation character
         */
-       if (*csv_parms.ssep == 0 || strNil(csv_parms.ssep))
-               csv_parms.ssep = NULL;
-
-       if (strNil(fname))
-               fname = NULL;
-       if (fname == NULL) {
-               s = be->mvc->scanner.rs;
-               from_stdin = true;
-               bstream_to_destroy = NULL;
+       if (*csv_parms->ssep == 0 || strNil(csv_parms->ssep))
+               csv_parms->ssep = NULL;
+
+       if (strNil(csv_parms->filename))
+               csv_parms->filename = NULL;
+       if (csv_parms->filename == NULL) {
+               *bs = be->mvc->scanner.rs;
+               *from_stdin = true;
        } else {
-               if (onclient) {
+               if (csv_parms->onclient) {
                        mnstr_write(be->mvc->scanner.ws, PROMPT3, 
sizeof(PROMPT3)-1, 1);
-                       if (csv_parms.offset > 1 && csv_parms.rsep && 
csv_parms.rsep[0] == '\n' && csv_parms.rsep[1] == '\0') {
+                       if (csv_parms->offset > 1 && csv_parms->rsep && 
csv_parms->rsep[0] == '\n' && csv_parms->rsep[1] == '\0') {
                                /* only let client skip simple lines */
                                //TODO shouldn't we return an error instead?
                                mnstr_printf(be->mvc->scanner.ws, "r " LLFMT " 
%s\n",
-                                            csv_parms.offset, fname);
-                               csv_parms.offset = 0;
+                                            csv_parms->offset, 
csv_parms->filename);
+                               csv_parms->offset = 0;
                        } else {
-                               mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n", 
fname);
+                               mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n", 
csv_parms->filename);
                        }
-                       msg = MAL_SUCCEED;
                        mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
                        while (!be->mvc->scanner.rs->eof)
                                bstream_next(be->mvc->scanner.rs);
-                       ss = be->mvc->scanner.rs->s;
+                       s = be->mvc->scanner.rs->s;
                        char buf[80];
-                       if ((len = mnstr_readline(ss, buf, sizeof(buf))) > 1) {
+                       ssize_t len;
+                       if ((len = mnstr_readline(s, buf, sizeof(buf))) > 1) {
                                if (buf[0] == '!' && buf[6] == '!')
-                                       msg = createException(IO, 
"sql.copy_from", "%.7s%s: %s", buf, fname, buf+7);
+                                       msg = createException(IO, 
"sql.copy_from", "%.7s%s: %s", buf, csv_parms->filename, buf+7);
                                else
-                                       msg = createException(IO, 
"sql.copy_from", "%s: %s", fname, buf);
+                                       msg = createException(IO, 
"sql.copy_from", "%s: %s", csv_parms->filename, buf);
                                while (buf[len - 1] != '\n' &&
-                                      (len = mnstr_readline(ss, buf, 
sizeof(buf))) > 0)
+                                      (len = mnstr_readline(s, buf, 
sizeof(buf))) > 0)
                                        ;
                                /* read until flush marker */
-                               while (mnstr_read(ss, buf, 1, sizeof(buf)) > 0)
+                               while (mnstr_read(s, buf, 1, sizeof(buf)) > 0)
                                        ;
                                return msg;
                        }
                } else {
-                       ss = open_rastream(fname);
-                       if (ss == NULL || mnstr_errnr(ss)) {
+                       s = open_rastream(csv_parms->filename);
+                       if (s == NULL || mnstr_errnr(s)) {
                                msg = createException(IO, "sql.copy_from", 
SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
-                               close_stream(ss);
+                               close_stream(s);
                                return msg;
                        }
                }
 
+               char *fixed_widths = csv_parms->fixed_widths;
                if (!strNil(fixed_widths)) {
                        size_t ncol = 0, current_width_entry = 0, i;
                        size_t *widths;
@@ -3079,7 +3048,7 @@ mvc_import_table_wrap(Client cntxt, MalB
                        }
                        widths = malloc(sizeof(size_t) * ncol);
                        if (!widths) {
-                               close_stream(ss);
+                               close_stream(s);
                                throw(MAL, "sql.copy_from", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                        }
                        for (i = 0; i < width_len; i++) {
@@ -3090,36 +3059,81 @@ mvc_import_table_wrap(Client cntxt, MalB
                                }
                        }
                        /* overwrite other delimiters to the ones the FWF 
stream uses */
-                       csv_parms.tsep = fwftsep;
-                       csv_parms.rsep = fwfrsep;
-
-                       ns = stream_fwf_create(ss, ncol, widths, 
STREAM_FWF_FILLER);
+                       csv_parms->tsep = fwftsep;
+                       csv_parms->rsep = fwfrsep;
+
+                       ns = stream_fwf_create(s, ncol, widths, 
STREAM_FWF_FILLER);
                        if (ns == NULL || mnstr_errnr(ns)) {
                                msg = createException(IO, "sql.copy_from", 
SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
-                               close_stream(ss);
+                               close_stream(s);
                                free(widths);
                                return msg;
                        }
-                       ss = ns;
+                       s = ns;
                }
-               s = bstream_create(ss, sizeof(void*) == 4 ? 0x20000 : 0x200000);
-               bstream_to_destroy = s;
-               from_stdin = false;
-               if (s == NULL) {
-                       close_stream(ss);
+               *bs = bstream_create(s, sizeof(void*) == 4 ? 0x20000 : 
0x200000);
+               if (*bs == NULL) {
+                       close_stream(s);
                        throw(MAL, "sql.copy_from", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                }
-       }
-       // extern str mvc_import_table(Client cntxt, BAT ***bats, mvc *c, 
bstream *s, bool from_stdin, sql_table *t, struct csv_parameters, 
append_directly);
-
-       msg = mvc_import_table(cntxt, &b, be->mvc, s, from_stdin, t, 
&csv_parms, append_directly);
-       if (onclient) {
+               *from_stdin = false;
+
+       }
+
+       return msg;
+}
+
+static void
+teardown_import_table_stream(Client cntxt, struct csv_parameters *csv_parms, 
bstream *bs)
+{
+       backend *be = cntxt->sqlcontext;
+       if (csv_parms->onclient) {
                mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1);
                mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
-               be->mvc->scanner.rs->eof = s->eof;
-               s->s = NULL;
-       }
-       bstream_destroy(bstream_to_destroy);
+               be->mvc->scanner.rs->eof = bs->eof;
+               if (strNil(csv_parms->filename))
+                       bstream_destroy(bs);
+       }
+}
+
+str
+mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str msg = MAL_SUCCEED;
+
+       if ((msg = checkSQLContext(cntxt)) != NULL)
+               return msg;
+       (void)mb;
+       backend *be = cntxt->sqlcontext;
+
+       sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0);
+       struct csv_parameters csv_parms = {
+               .tsep = *getArgReference_str(stk, pci, pci->retc + 1),
+               .rsep = *getArgReference_str(stk, pci, pci->retc + 2),
+               .ssep = *getArgReference_str(stk, pci, pci->retc + 3),
+               .ns = *getArgReference_str(stk, pci, pci->retc + 4),
+               .filename = *getArgReference_str(stk, pci, pci->retc + 5),
+               .nr = *getArgReference_lng(stk, pci, pci->retc + 6),
+               .offset = *getArgReference_lng(stk, pci, pci->retc + 7),
+               .best = *getArgReference_int(stk, pci, pci->retc + 8),
+               .fixed_widths = *getArgReference_str(stk, pci, pci->retc + 9),
+               .onclient = *getArgReference_int(stk, pci, pci->retc + 10),
+               .escape = *getArgReference_int(stk, pci, pci->retc + 11),
+       };
+
+       bool append_directly = false;
+       if (csv_parms.best >= 100) {
+               // this matches the temporary ugliness in rel2bin_insert
+               csv_parms.best -= 100;
+               append_directly = true;
+       }
+
+       bstream *bs;
+       bool from_stdin;
+       BAT **b = NULL;
+       msg = setup_import_table_stream(cntxt, &csv_parms, &bs, &from_stdin);
+       msg = mvc_import_table(cntxt, &b, be->mvc, bs, from_stdin, t, 
&csv_parms, append_directly);
+       teardown_import_table_stream(cntxt, &csv_parms, bs);
 
        if (b && !msg)
                bat2return(stk, pci, b);
diff --git a/sql/backends/monet5/sql_copyinto.h 
b/sql/backends/monet5/sql_copyinto.h
--- a/sql/backends/monet5/sql_copyinto.h
+++ b/sql/backends/monet5/sql_copyinto.h
@@ -17,9 +17,12 @@ struct csv_parameters {
        const char *rsep;
        const char *ssep;
        const char *ns;
+       const char *filename;
        lng nr;
        lng offset;
        int best;
+       char *fixed_widths;
+       bool onclient;
        bool escape;
 };
 
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to