Changeset: 637ab3fc435a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/637ab3fc435a
Modified Files:
sql/backends/monet5/sql.c
sql/backends/monet5/sql_copyinto.h
Branch: directappend
Log Message:
Factor stream handling out of mvc_import_table_wrap
diffs (260 lines):
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -2972,99 +2972,68 @@ bat2return(MalStkPtr stk, InstrPtr pci,
static char fwftsep[2] = {STREAM_FWF_FIELD_SEP, '\0'};
static char fwfrsep[2] = {STREAM_FWF_RECORD_SEP, '\0'};
-/* str mvc_import_table_wrap(int *res, sql_table **t, unsigned char* *T,
unsigned char* *R, unsigned char* *S, unsigned char* *N, str *fname, lng *sz,
lng *offset, int *besteffort, str *fixed_width, int *onclient, int *escape); */
-str
-mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+static str
+setup_import_table_stream(Client cntxt, struct csv_parameters *csv_parms,
bstream **bs, bool *from_stdin)
{
- backend *be;
- BAT **b = NULL;
- ssize_t len = 0;
- sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0);
- struct csv_parameters csv_parms = {
- .tsep = *getArgReference_str(stk, pci, pci->retc + 1),
- .rsep = *getArgReference_str(stk, pci, pci->retc + 2),
- .ssep = *getArgReference_str(stk, pci, pci->retc + 3),
- .ns = *getArgReference_str(stk, pci, pci->retc + 4),
- .nr = *getArgReference_lng(stk, pci, pci->retc + 6),
- .offset = *getArgReference_lng(stk, pci, pci->retc + 7),
- .best = *getArgReference_int(stk, pci, pci->retc + 8),
- .escape = *getArgReference_int(stk, pci, pci->retc + 11),
- };
-
- const char *fname = *getArgReference_str(stk, pci, pci->retc + 5);
- bool append_directly = false;
- if (csv_parms.best >= 100) {
- // this matches the temporary ugliness in rel2bin_insert
- csv_parms.best -= 100;
- append_directly = true;
- }
- char *fixed_widths = *getArgReference_str(stk, pci, pci->retc + 9);
- int onclient = *getArgReference_int(stk, pci, pci->retc + 10);
str msg = MAL_SUCCEED;
- bstream *bstream_to_destroy;
- bstream *s;
- bool from_stdin;
- stream *ss;
-
- (void) mb; /* NOT USED */
- if ((msg = checkSQLContext(cntxt)) != NULL)
- return msg;
- if (onclient && !cntxt->filetrans)
+ backend *be = cntxt->sqlcontext;
+ stream *s;
+
+ if (csv_parms->onclient && !cntxt->filetrans)
throw(MAL, "sql.copy_from", SQLSTATE(42000) "Cannot transfer
files from client");
- be = cntxt->sqlcontext;
/* The CSV parser expects ssep to have the value 0 if the user does not
* specify a quotation character
*/
- if (*csv_parms.ssep == 0 || strNil(csv_parms.ssep))
- csv_parms.ssep = NULL;
-
- if (strNil(fname))
- fname = NULL;
- if (fname == NULL) {
- s = be->mvc->scanner.rs;
- from_stdin = true;
- bstream_to_destroy = NULL;
+ if (*csv_parms->ssep == 0 || strNil(csv_parms->ssep))
+ csv_parms->ssep = NULL;
+
+ if (strNil(csv_parms->filename))
+ csv_parms->filename = NULL;
+ if (csv_parms->filename == NULL) {
+ *bs = be->mvc->scanner.rs;
+ *from_stdin = true;
} else {
- if (onclient) {
+ if (csv_parms->onclient) {
mnstr_write(be->mvc->scanner.ws, PROMPT3,
sizeof(PROMPT3)-1, 1);
- if (csv_parms.offset > 1 && csv_parms.rsep &&
csv_parms.rsep[0] == '\n' && csv_parms.rsep[1] == '\0') {
+ if (csv_parms->offset > 1 && csv_parms->rsep &&
csv_parms->rsep[0] == '\n' && csv_parms->rsep[1] == '\0') {
/* only let client skip simple lines */
//TODO shouldn't we return an error instead?
mnstr_printf(be->mvc->scanner.ws, "r " LLFMT "
%s\n",
- csv_parms.offset, fname);
- csv_parms.offset = 0;
+ csv_parms->offset,
csv_parms->filename);
+ csv_parms->offset = 0;
} else {
- mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n",
fname);
+ mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n",
csv_parms->filename);
}
- msg = MAL_SUCCEED;
mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
while (!be->mvc->scanner.rs->eof)
bstream_next(be->mvc->scanner.rs);
- ss = be->mvc->scanner.rs->s;
+ s = be->mvc->scanner.rs->s;
char buf[80];
- if ((len = mnstr_readline(ss, buf, sizeof(buf))) > 1) {
+ ssize_t len;
+ if ((len = mnstr_readline(s, buf, sizeof(buf))) > 1) {
if (buf[0] == '!' && buf[6] == '!')
- msg = createException(IO,
"sql.copy_from", "%.7s%s: %s", buf, fname, buf+7);
+ msg = createException(IO,
"sql.copy_from", "%.7s%s: %s", buf, csv_parms->filename, buf+7);
else
- msg = createException(IO,
"sql.copy_from", "%s: %s", fname, buf);
+ msg = createException(IO,
"sql.copy_from", "%s: %s", csv_parms->filename, buf);
while (buf[len - 1] != '\n' &&
- (len = mnstr_readline(ss, buf,
sizeof(buf))) > 0)
+ (len = mnstr_readline(s, buf,
sizeof(buf))) > 0)
;
/* read until flush marker */
- while (mnstr_read(ss, buf, 1, sizeof(buf)) > 0)
+ while (mnstr_read(s, buf, 1, sizeof(buf)) > 0)
;
return msg;
}
} else {
- ss = open_rastream(fname);
- if (ss == NULL || mnstr_errnr(ss)) {
+ s = open_rastream(csv_parms->filename);
+ if (s == NULL || mnstr_errnr(s)) {
msg = createException(IO, "sql.copy_from",
SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
- close_stream(ss);
+ close_stream(s);
return msg;
}
}
+ char *fixed_widths = csv_parms->fixed_widths;
if (!strNil(fixed_widths)) {
size_t ncol = 0, current_width_entry = 0, i;
size_t *widths;
@@ -3079,7 +3048,7 @@ mvc_import_table_wrap(Client cntxt, MalB
}
widths = malloc(sizeof(size_t) * ncol);
if (!widths) {
- close_stream(ss);
+ close_stream(s);
throw(MAL, "sql.copy_from", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
}
for (i = 0; i < width_len; i++) {
@@ -3090,36 +3059,81 @@ mvc_import_table_wrap(Client cntxt, MalB
}
}
/* overwrite other delimiters to the ones the FWF
stream uses */
- csv_parms.tsep = fwftsep;
- csv_parms.rsep = fwfrsep;
-
- ns = stream_fwf_create(ss, ncol, widths,
STREAM_FWF_FILLER);
+ csv_parms->tsep = fwftsep;
+ csv_parms->rsep = fwfrsep;
+
+ ns = stream_fwf_create(s, ncol, widths,
STREAM_FWF_FILLER);
if (ns == NULL || mnstr_errnr(ns)) {
msg = createException(IO, "sql.copy_from",
SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
- close_stream(ss);
+ close_stream(s);
free(widths);
return msg;
}
- ss = ns;
+ s = ns;
}
- s = bstream_create(ss, sizeof(void*) == 4 ? 0x20000 : 0x200000);
- bstream_to_destroy = s;
- from_stdin = false;
- if (s == NULL) {
- close_stream(ss);
+ *bs = bstream_create(s, sizeof(void*) == 4 ? 0x20000 :
0x200000);
+ if (*bs == NULL) {
+ close_stream(s);
throw(MAL, "sql.copy_from", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
}
- }
- // extern str mvc_import_table(Client cntxt, BAT ***bats, mvc *c,
bstream *s, bool from_stdin, sql_table *t, struct csv_parameters,
append_directly);
-
- msg = mvc_import_table(cntxt, &b, be->mvc, s, from_stdin, t,
&csv_parms, append_directly);
- if (onclient) {
+ *from_stdin = false;
+
+ }
+
+ return msg;
+}
+
+static void
+teardown_import_table_stream(Client cntxt, struct csv_parameters *csv_parms,
bstream *bs)
+{
+ backend *be = cntxt->sqlcontext;
+ if (csv_parms->onclient) {
mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1);
mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
- be->mvc->scanner.rs->eof = s->eof;
- s->s = NULL;
- }
- bstream_destroy(bstream_to_destroy);
+ be->mvc->scanner.rs->eof = bs->eof;
+ if (strNil(csv_parms->filename))
+ bstream_destroy(bs);
+ }
+}
+
+str
+mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str msg = MAL_SUCCEED;
+
+ if ((msg = checkSQLContext(cntxt)) != NULL)
+ return msg;
+ (void)mb;
+ backend *be = cntxt->sqlcontext;
+
+ sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0);
+ struct csv_parameters csv_parms = {
+ .tsep = *getArgReference_str(stk, pci, pci->retc + 1),
+ .rsep = *getArgReference_str(stk, pci, pci->retc + 2),
+ .ssep = *getArgReference_str(stk, pci, pci->retc + 3),
+ .ns = *getArgReference_str(stk, pci, pci->retc + 4),
+ .filename = *getArgReference_str(stk, pci, pci->retc + 5),
+ .nr = *getArgReference_lng(stk, pci, pci->retc + 6),
+ .offset = *getArgReference_lng(stk, pci, pci->retc + 7),
+ .best = *getArgReference_int(stk, pci, pci->retc + 8),
+ .fixed_widths = *getArgReference_str(stk, pci, pci->retc + 9),
+ .onclient = *getArgReference_int(stk, pci, pci->retc + 10),
+ .escape = *getArgReference_int(stk, pci, pci->retc + 11),
+ };
+
+ bool append_directly = false;
+ if (csv_parms.best >= 100) {
+ // this matches the temporary ugliness in rel2bin_insert
+ csv_parms.best -= 100;
+ append_directly = true;
+ }
+
+ bstream *bs;
+ bool from_stdin;
+ BAT **b = NULL;
+ msg = setup_import_table_stream(cntxt, &csv_parms, &bs, &from_stdin);
+ msg = mvc_import_table(cntxt, &b, be->mvc, bs, from_stdin, t,
&csv_parms, append_directly);
+ teardown_import_table_stream(cntxt, &csv_parms, bs);
if (b && !msg)
bat2return(stk, pci, b);
diff --git a/sql/backends/monet5/sql_copyinto.h
b/sql/backends/monet5/sql_copyinto.h
--- a/sql/backends/monet5/sql_copyinto.h
+++ b/sql/backends/monet5/sql_copyinto.h
@@ -17,9 +17,12 @@ struct csv_parameters {
const char *rsep;
const char *ssep;
const char *ns;
+ const char *filename;
lng nr;
lng offset;
int best;
+ char *fixed_widths;
+ bool onclient;
bool escape;
};
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list