Changeset: cd5a6aab6d21 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/cd5a6aab6d21
Modified Files:
sql/backends/monet5/sql.c
sql/backends/monet5/sql.h
sql/backends/monet5/sql_bincopy.c
sql/backends/monet5/sql_bincopyconvert.c
sql/backends/monet5/sql_bincopyconvert.h
Branch: copyfaster
Log Message:
Add operators sql.importRaw and sql.importNulTerminated
diffs (192 lines):
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -5764,7 +5764,21 @@ static mel_func sql_init_funcs[] = {
pattern("sql", "copy_from", mvc_import_table_wrap, true, "Import a table from
bstream s with the \ngiven tuple and separators (sep/rsep)", args(1,15,
batvarargany("",0),arg("t",ptr),arg("sep",str),arg("rsep",str),arg("ssep",str),arg("ns",str),arg("fname",str),arg("nr",lng),arg("offset",lng),arg("best",int),arg("fwf",str),arg("onclient",int),arg("escape",int),arg("decsep",str),arg("decskip",str))),
//we use bat.single now
//pattern("sql", "single", CMDBATsingle, false, "", args(1,2,
batargany("",2),argany("x",2))),
+
pattern("sql", "importColumn", mvc_bin_import_column_wrap, false, "Import a
column from the given file", args(2, 8, batargany("", 0),arg("", oid),
arg("method",str),arg("width",int),arg("bswap",bit),arg("path",str),arg("onclient",int),arg("nrows",oid))),
+
+ pattern("sql", "importNulTerminated", mvc_bin_import_nul_terminated_wrap,
false, "Import a column from the bytes in the given bat",
+ args(2, 6,
+ batargany("", 0),arg("", oid),
+
arg("method",str),arg("width",int),batarg("bytes",bte),arg("nrows",oid))),
+
+ pattern("sql", "importRaw", mvc_bin_import_bytes_wrap, false, "Import the raw
bytes from the given file",
+ args(2, 5,
+ batargany("", 0),arg("", oid),
+ arg("path",str),arg("onclient",int),arg("nrows",oid)
+ )),
+
+
command("aggr", "not_unique", not_unique, false, "check if the tail sorted
bat b doesn't have unique tail values", args(1,2, arg("",bit),batarg("b",oid))),
command("sql", "optimizers", getPipeCatalog, false, "", args(3,3,
batarg("",str),batarg("",str),batarg("",str))),
pattern("sql", "optimizer_updates", SQLoptimizersUpdate, false, "", noargs),
diff --git a/sql/backends/monet5/sql.h b/sql/backends/monet5/sql.h
--- a/sql/backends/monet5/sql.h
+++ b/sql/backends/monet5/sql.h
@@ -97,6 +97,8 @@ extern str wrap_onclient_compression(str
extern str mvc_export_row_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
extern str mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
extern str mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+extern str mvc_bin_import_bytes_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+extern str mvc_bin_import_nul_terminated_wrap(Client cntxt, MalBlkPtr mb,
MalStkPtr stk, InstrPtr pci);
extern str mvc_bin_export_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
extern str setVariable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
extern str getVariable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
diff --git a/sql/backends/monet5/sql_bincopy.c
b/sql/backends/monet5/sql_bincopy.c
--- a/sql/backends/monet5/sql_bincopy.c
+++ b/sql/backends/monet5/sql_bincopy.c
@@ -332,7 +332,123 @@ mvc_bin_import_column_wrap(Client cntxt,
return import_column(be, ret, retcnt, method, width, byteswap, path,
onclient, nrows);
}
+static str
+import_nul_terminated(backend *be, bat *ret, BUN *retcnt, str method, int
width, bat bytes, BUN nrows)
+{
+ (void)be;
+ const str mal_operator = "sql.importNulTerminated";
+ str msg = MAL_SUCCEED;
+ BAT *input = NULL;
+ BAT *result = NULL;
+ int gdk_type;
+ allocator *ma = MT_thread_getallocator();
+ allocator_state ma_state = ma_open(ma);
+ struct insert_state state = { NULL };
+ BATiter bi;
+ const char *data;
+ size_t size;
+ size_t consumed;
+ *ret = 0;
+ *retcnt = 0;
+ type_record_t *rec = find_type_rec(method);
+ if (rec == NULL)
+ bailout("COPY BINARY FROM not implemented for '%s'", method);
+ if (!is_nul_terminated_text(rec))
+ bailout("'%s' does not import as zero-terminated text", method);
+
+ input = BATdescriptor(bytes);
+ if (input == NULL)
+ bailout("%s", GDK_EXCEPTION);
+
+ gdk_type = ATOMindex(rec->gdk_type);
+ if (gdk_type < 0)
+ bailout("cannot load data as %s: unknown atom type %s", method,
rec->gdk_type);
+ result = COLnew(0, gdk_type, nrows, PERSISTENT);
+ if (result == NULL)
+ bailout("%s", GDK_EXCEPTION);
+
+ init_insert_state(&state, ma, result, width);
+ bi = bat_iterator(input);
+ data = BUNtloc(bi, 0);
+ size = BATcount(input);
+ msg = insert_nul_terminated_values(&state, data, size, &consumed);
+ bat_iterator_end(&bi);
+ if (msg != MAL_SUCCEED)
+ goto end;
+ if (consumed < size)
+ bailout("unterminated string at end");
+
+ // Maintain bookkeeping
+ BATsetcount(result, result->batCount);
+ result->tkey = false;
+ result->tnonil = false;
+ result->tsorted = false;
+ result->trevsorted = false;
+ result->tascii = false;
+
+ *ret = result->batCacheid;
+ *retcnt = BATcount(result);
+ msg = MAL_SUCCEED;
+
+end:
+ release_insert_state(&state);
+ ma_close(ma, &ma_state);
+ if (input != NULL)
+ BBPunfix(input->batCacheid);
+ if (result != NULL) {
+ if (msg == MAL_SUCCEED)
+ BBPkeepref(result);
+ else
+ BBPunfix(result->batCacheid);
+ }
+ return msg;
+}
+
+
+str
+mvc_bin_import_nul_terminated_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci)
+{
+ (void)mb;
+
+ assert(pci->retc == 2);
+ bat *ret = getArgReference_bat(stk, pci, 0);
+ BUN *retcnt = getArgReference_oid(stk, pci, 1);
+
+ assert(pci->argc == 6);
+ str method = *getArgReference_str(stk, pci, 2);
+ int width = *getArgReference_int(stk, pci, 3);
+ bat bytes = *getArgReference_bat(stk, pci, 4);
+ BUN nrows = *getArgReference_oid(stk, pci, 5);
+
+ backend *be = cntxt->sqlcontext;
+
+ return import_nul_terminated(be, ret, retcnt, method, width, bytes,
nrows);
+}
+
+
+str
+mvc_bin_import_bytes_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci)
+{
+ (void)mb;
+
+ assert(pci->retc == 2);
+ bat *ret = getArgReference_bat(stk, pci, 0);
+ BUN *retcnt = getArgReference_oid(stk, pci, 1);
+
+ assert(pci->argc == 5);
+ str path = *getArgReference_str(stk, pci, 2);
+ int onclient = *getArgReference_int(stk, pci, 3);
+ // we don't use it ourselves but we MUST pass it on because
+ // we use it to sequence the loads
+ BUN ignored_nrows = *getArgReference_oid(stk, pci, 4);
+
+ backend *be = cntxt->sqlcontext;
+ str retval = import_column(be, ret, retcnt, "bte", 0, false, path,
onclient, 0);
+
+ *retcnt = ignored_nrows; // just pass the value we got
+ return retval;
+}
static str
write_out(const char *start, const char *end, stream *s)
diff --git a/sql/backends/monet5/sql_bincopyconvert.c
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -571,6 +571,12 @@ end:
return msg;
}
+bool
+is_nul_terminated_text(type_record_t *rec)
+{
+ return rec->loader == load_zero_terminated_text;
+}
+
static str
diff --git a/sql/backends/monet5/sql_bincopyconvert.h
b/sql/backends/monet5/sql_bincopyconvert.h
--- a/sql/backends/monet5/sql_bincopyconvert.h
+++ b/sql/backends/monet5/sql_bincopyconvert.h
@@ -69,6 +69,8 @@ extern bool can_dump_binary_column(type_
extern str dump_binary_column(type_record_t *rec, BAT *b, BUN start, BUN
length, bool byteswap, stream *s);
+extern bool is_nul_terminated_text(type_record_t *rec);
+
struct insert_state {
allocator *ma;
BAT *bat;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]