Changeset: cd5a6aab6d21 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/cd5a6aab6d21
Modified Files:
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql.h
        sql/backends/monet5/sql_bincopy.c
        sql/backends/monet5/sql_bincopyconvert.c
        sql/backends/monet5/sql_bincopyconvert.h
Branch: copyfaster
Log Message:

Add operators sql.importRaw and sql.importNulTerminated


diffs (192 lines):

diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -5764,7 +5764,21 @@ static mel_func sql_init_funcs[] = {
  pattern("sql", "copy_from", mvc_import_table_wrap, true, "Import a table from 
bstream s with the \ngiven tuple and separators (sep/rsep)", args(1,15, 
batvarargany("",0),arg("t",ptr),arg("sep",str),arg("rsep",str),arg("ssep",str),arg("ns",str),arg("fname",str),arg("nr",lng),arg("offset",lng),arg("best",int),arg("fwf",str),arg("onclient",int),arg("escape",int),arg("decsep",str),arg("decskip",str))),
  //we use bat.single now
  //pattern("sql", "single", CMDBATsingle, false, "", args(1,2, 
batargany("",2),argany("x",2))),
+
  pattern("sql", "importColumn", mvc_bin_import_column_wrap, false, "Import a 
column from the given file", args(2, 8, batargany("", 0),arg("", oid), 
arg("method",str),arg("width",int),arg("bswap",bit),arg("path",str),arg("onclient",int),arg("nrows",oid))),
+
+ pattern("sql", "importNulTerminated", mvc_bin_import_nul_terminated_wrap, 
false, "Import a column from the bytes in the given bat",
+       args(2, 6,
+               batargany("", 0),arg("", oid),
+               
arg("method",str),arg("width",int),batarg("bytes",bte),arg("nrows",oid))),
+
+ pattern("sql", "importRaw", mvc_bin_import_bytes_wrap, false, "Import the raw 
bytes from the given file",
+       args(2, 5,
+               batargany("", 0),arg("", oid),
+               arg("path",str),arg("onclient",int),arg("nrows",oid)
+ )),
+
+
  command("aggr", "not_unique", not_unique, false, "check if the tail sorted 
bat b doesn't have unique tail values", args(1,2, arg("",bit),batarg("b",oid))),
  command("sql", "optimizers", getPipeCatalog, false, "", args(3,3, 
batarg("",str),batarg("",str),batarg("",str))),
  pattern("sql", "optimizer_updates", SQLoptimizersUpdate, false, "", noargs),
diff --git a/sql/backends/monet5/sql.h b/sql/backends/monet5/sql.h
--- a/sql/backends/monet5/sql.h
+++ b/sql/backends/monet5/sql.h
@@ -97,6 +97,8 @@ extern str wrap_onclient_compression(str
 extern str mvc_export_row_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 extern str mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 extern str mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
+extern str mvc_bin_import_bytes_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
+extern str mvc_bin_import_nul_terminated_wrap(Client cntxt, MalBlkPtr mb, 
MalStkPtr stk, InstrPtr pci);
 extern str mvc_bin_export_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
 extern str setVariable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 extern str getVariable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/backends/monet5/sql_bincopy.c 
b/sql/backends/monet5/sql_bincopy.c
--- a/sql/backends/monet5/sql_bincopy.c
+++ b/sql/backends/monet5/sql_bincopy.c
@@ -332,7 +332,123 @@ mvc_bin_import_column_wrap(Client cntxt,
        return import_column(be, ret, retcnt, method, width, byteswap, path, 
onclient, nrows);
 }
 
+static str
+import_nul_terminated(backend *be, bat *ret, BUN *retcnt, str method, int 
width, bat bytes, BUN nrows)
+{
+       (void)be;
+       const str mal_operator = "sql.importNulTerminated";
+       str msg = MAL_SUCCEED;
+       BAT *input = NULL;
+       BAT *result = NULL;
+       int gdk_type;
+       allocator *ma = MT_thread_getallocator();
+       allocator_state ma_state = ma_open(ma);
+       struct insert_state state = { NULL };
+       BATiter bi;
+       const char *data;
+       size_t size;
+       size_t consumed;
 
+       *ret = 0;
+       *retcnt = 0;
+       type_record_t *rec = find_type_rec(method);
+       if (rec == NULL)
+               bailout("COPY BINARY FROM not implemented for '%s'", method);
+       if (!is_nul_terminated_text(rec))
+               bailout("'%s' does not import as zero-terminated text", method);
+
+       input = BATdescriptor(bytes);
+       if (input == NULL)
+               bailout("%s", GDK_EXCEPTION);
+
+       gdk_type = ATOMindex(rec->gdk_type);
+       if (gdk_type < 0)
+               bailout("cannot load data as %s: unknown atom type %s", method, 
rec->gdk_type);
+       result = COLnew(0, gdk_type, nrows, PERSISTENT);
+       if (result == NULL)
+               bailout("%s", GDK_EXCEPTION);
+
+       init_insert_state(&state, ma, result, width);
+       bi = bat_iterator(input);
+       data = BUNtloc(bi, 0);
+       size = BATcount(input);
+       msg = insert_nul_terminated_values(&state, data, size, &consumed);
+       bat_iterator_end(&bi);
+       if (msg != MAL_SUCCEED)
+               goto end;
+       if (consumed < size)
+               bailout("unterminated string at end");
+
+       // Maintain bookkeeping
+       BATsetcount(result, result->batCount);
+       result->tkey = false;
+       result->tnonil = false;
+       result->tsorted = false;
+       result->trevsorted = false;
+       result->tascii = false;
+
+       *ret = result->batCacheid;
+       *retcnt = BATcount(result);
+       msg = MAL_SUCCEED;
+
+end:
+       release_insert_state(&state);
+       ma_close(ma, &ma_state);
+       if (input != NULL)
+               BBPunfix(input->batCacheid);
+       if (result != NULL) {
+               if (msg == MAL_SUCCEED)
+                       BBPkeepref(result);
+               else
+                       BBPunfix(result->batCacheid);
+       }
+       return msg;
+}
+
+
+str
+mvc_bin_import_nul_terminated_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci)
+{
+       (void)mb;
+
+       assert(pci->retc == 2);
+       bat *ret = getArgReference_bat(stk, pci, 0);
+       BUN *retcnt = getArgReference_oid(stk, pci, 1);
+
+       assert(pci->argc == 6);
+       str method = *getArgReference_str(stk, pci, 2);
+       int width = *getArgReference_int(stk, pci, 3);
+       bat bytes = *getArgReference_bat(stk, pci, 4);
+       BUN nrows = *getArgReference_oid(stk, pci, 5);
+
+       backend *be = cntxt->sqlcontext;
+
+       return import_nul_terminated(be, ret, retcnt, method, width, bytes, 
nrows);
+}
+
+
+str
+mvc_bin_import_bytes_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci)
+{
+       (void)mb;
+
+       assert(pci->retc == 2);
+       bat *ret = getArgReference_bat(stk, pci, 0);
+       BUN *retcnt = getArgReference_oid(stk, pci, 1);
+
+       assert(pci->argc == 5);
+       str path = *getArgReference_str(stk, pci, 2);
+       int onclient = *getArgReference_int(stk, pci, 3);
+       // we don't use it ourselves but we MUST pass it on because
+       // we use it to sequence the loads
+       BUN ignored_nrows = *getArgReference_oid(stk, pci, 4);
+
+       backend *be = cntxt->sqlcontext;
+       str retval = import_column(be, ret, retcnt, "bte", 0, false, path, 
onclient, 0);
+
+       *retcnt = ignored_nrows; // just pass the value we got
+       return retval;
+}
 
 static str
 write_out(const char *start, const char *end, stream *s)
diff --git a/sql/backends/monet5/sql_bincopyconvert.c 
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -571,6 +571,12 @@ end:
        return msg;
 }
 
+bool
+is_nul_terminated_text(type_record_t *rec)
+{
+       return rec->loader == load_zero_terminated_text;
+}
+
 
 
 static str
diff --git a/sql/backends/monet5/sql_bincopyconvert.h 
b/sql/backends/monet5/sql_bincopyconvert.h
--- a/sql/backends/monet5/sql_bincopyconvert.h
+++ b/sql/backends/monet5/sql_bincopyconvert.h
@@ -69,6 +69,8 @@ extern bool can_dump_binary_column(type_
 
 extern str dump_binary_column(type_record_t *rec, BAT *b, BUN start, BUN 
length, bool byteswap, stream *s);
 
+extern bool is_nul_terminated_text(type_record_t *rec);
+
 struct insert_state {
        allocator *ma;
        BAT *bat;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to