Changeset: 0b541fd46b02 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/0b541fd46b02
Modified Files:
        sql/backends/monet5/sql_bincopy.c
        sql/backends/monet5/sql_bincopyconvert.c
        sql/backends/monet5/sql_bincopyconvert.h
Branch: copyfaster
Log Message:

Refactor load_zero_terminated_text

Split io and string handling, later on we plan to reuse the string handling


diffs (269 lines):

diff --git a/sql/backends/monet5/sql_bincopy.c 
b/sql/backends/monet5/sql_bincopy.c
--- a/sql/backends/monet5/sql_bincopy.c
+++ b/sql/backends/monet5/sql_bincopy.c
@@ -185,7 +185,6 @@ static str
 load_column(type_record_t *rec, const char *name, BAT *bat, stream *s, int 
width, bool byteswap, BUN rows_estimate, int *eof_reached)
 {
        static const char mal_operator[] = "sql.importColumn";
-       BUN orig_count, new_count;
        str msg = MAL_SUCCEED;
        BUN rows_added;
 
@@ -195,12 +194,11 @@ load_column(type_record_t *rec, const ch
 
        // sanity check
        assert( (loader != NULL) + (decoder != NULL) + trivial == 1); 
(void)trivial;
+       assert(BATcount(bat) == 0);
 
        if (rec->trivial_if_no_byteswap && !byteswap)
                decoder = NULL;
 
-       orig_count = BATcount(bat);
-
        if (loader) {
                msg = loader(bat, s, eof_reached, width, byteswap);
        } else if (decoder) {
@@ -210,8 +208,7 @@ load_column(type_record_t *rec, const ch
                msg = load_trivial(bat, s, name, rec->validate, width, 
rows_estimate, eof_reached);
        }
 
-       new_count = BATcount(bat);
-       rows_added = new_count - orig_count;
+       rows_added = BATcount(bat);
 
        if (rows_added > 0) {
                // We don't know anything about the data we just loaded
diff --git a/sql/backends/monet5/sql_bincopyconvert.c 
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -328,87 +328,157 @@ encode_timestamp(void *dst_, void *src_,
        return MAL_SUCCEED;
 }
 
-// Load NUL-terminated items from the stream and put them in the BAT.
+void init_insert_state(struct insert_state *st, allocator *ma, BAT *bat, int 
width) {
+       *st = (struct insert_state) {
+               .bat = bat,
+               .width = width,
+               .scratch = NULL,
+               .schratch_len = 0,
+               .left_over = 0,
+               .ma = ma,
+       };
+};
+
+void release_insert_state(struct insert_state *st) {
+       // No longer needed because we use the .ma allocator which is managed by
+       // our caller: GDKfree(st->scratch);
+       (void)st;
+}
+
+static str
+insert_one_nul_terminated(struct insert_state *st, const char *item, size_t 
item_len)
+{
+       int tpe = BATttype(st->bat);
+       const void *value;
+
+       assert(item_len > 0);
+       assert(item[item_len - 1] == '\0');
+
+       unsigned char *uitem = (unsigned char *)item;
+
+       if (uitem[0] == 0x80 && item_len == 2) {
+               value = ATOMnilptr(tpe);
+       } else {
+               if (!checkUTF8(item)) {
+                       throw(SQL, "insert_nul_terminated", SQLSTATE(42000) 
"malformed utf-8 byte sequence");
+               }
+               if (tpe == TYPE_str) {
+                       if (st->width > 0 && UTF8_strlen(item) > st->width)
+                               throw(SQL, "insert_nul_terminated", "string too 
wide for column");
+                       value = item;
+               } else {
+                       ssize_t n = ATOMfromstr(st->ma, tpe, &st->scratch, 
&st->schratch_len, item, false);
+                       if (n <= 0)
+                               throw(SQL, "insert_nul_terminated", 
GDK_EXCEPTION);
+                       value = st->scratch;
+               }
+       }
+
+       // By now 'value' has been set
+       if (bunfastapp(st->bat, value) != GDK_SUCCEED) {
+               throw(SQL, "insert_nul_terminated", GDK_EXCEPTION);
+       }
+
+       return MAL_SUCCEED;
+}
+
+str insert_some_nul_terminated(struct insert_state *st, const char *data, 
size_t total_len, size_t *consumed)
+{
+       str msg;
+       assert(st->left_over <= total_len);
+
+       const char *start = data; // start of the current item
+       const char *pos = data + st->left_over; // where to start searching
+       const char *limit = data + total_len;
+
+       while (pos < limit) {
+               const char *end = memchr(pos, '\0', limit - pos);
+               if (end == NULL)
+                       break;
+               size_t len = end + 1 - start;
+               msg = insert_one_nul_terminated(st, start, len);
+               if (msg != MAL_SUCCEED)
+                       return msg;
+               start += len;
+               pos = start;
+       }
+
+       *consumed = start - data;
+       st->left_over = data + total_len - start;
+       return MAL_SUCCEED;
+}
+
+// #define DEBUG_PRINTFS
+
 static str
 load_zero_terminated_text(BAT *bat, stream *s, int *eof_reached, int width, 
bool byteswap)
 {
-       (void)byteswap;
-       static const char mal_operator[] = "sql.importColumn";
-       str msg = MAL_SUCCEED;
+       str msg;
+       static const char mal_operator[] = "sql.export_bin_column";
+       const size_t min_read = 8190;
+       allocator *ma = MT_thread_getallocator();
+       allocator_state ma_state = ma_open(ma);
+       struct insert_state state;
        bstream *bs = NULL;
-       int tpe = BATttype(bat);
-       void *buffer = NULL;
-       size_t buffer_len = 0;
-       allocator *sa = create_allocator(NULL, NULL, false);
 
-       // convert_and_validate_utf8() above counts on the following property 
to hold:
-       assert(strNil((const char[2]){ 0x80, 0 }));
-
-       bs = bstream_create(s, 1 << 20);
+       (void)byteswap; // not applicable to strings
+       init_insert_state(&state, ma, bat, width);
+       bs = bstream_create(s, 1<<20);
        if (bs == NULL) {
                msg = createException(SQL, "sql", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                goto end;
        }
 
-       // In the outer loop we refill the buffer until the stream ends.
-       // In the inner loop we look for complete \0-terminated strings.
        while (1) {
-               ssize_t nread = bstream_next(bs);
+               size_t in_use = bs->len - bs->pos;
+               size_t free = bs->size - in_use;
+               size_t to_read = free >= min_read ? free : min_read;
+               #ifdef DEBUG_PRINTFS
+               fprintf(stderr, "# before read %zu: %zu..%zu/%zu = ", to_read, 
bs->pos, bs->len, bs->size);
+               for (size_t i = bs->pos; i < bs->len; i++)
+                       fprintf(stderr, " %02X", (unsigned char) bs->buf[i]);
+               fprintf(stderr, "\n");
+               #endif
+               ssize_t nread = bstream_read(bs, to_read);
                if (nread < 0)
                        bailout("%s", mnstr_peek_error(s));
-               if (nread == 0)
+               else if (nread == 0) {
+                       assert(bs->eof);
                        break;
-
-               char *buf_start = &bs->buf[bs->pos];
-               char *buf_end = &bs->buf[bs->len];
-               char *start, *end;
-               for (start = buf_start; (end = memchr(start, '\0', buf_end - 
start)) != NULL; start = end + 1) {
-                       char *value;
-                       if (!checkUTF8(start)) {
-                               msg = createException(SQL, 
"load_zero_terminated_text", SQLSTATE(42000) "malformed utf-8 byte sequence");
-                               goto end;
-                       }
-                       if (tpe == TYPE_str) {
-                               if (width > 0 && !strNil(start)) {
-                                       int w = UTF8_strlen(start);
-                                       if (w > width) {
-                                               msg = createException(SQL, 
"sql.importColumn", "string too wide for column");
-                                               goto end;
-                                       }
-                               }
-                               value = start;
-                       } else {
-                               ssize_t n = ATOMfromstr(sa, tpe, &buffer, 
&buffer_len, start, false);
-                               if (n <= 0) {
-                                       msg = createException(SQL, 
"sql.importColumn", GDK_EXCEPTION);
-                                       goto end;
-                               }
-                               value = buffer;
-                       }
-                       if (bunfastapp(bat, value) != GDK_SUCCEED) {
-                               msg = createException(SQL, "sql.importColumn", 
GDK_EXCEPTION);
-                               goto end;
-                       }
                }
-               bs->pos = start - buf_start;
+               #ifdef DEBUG_PRINTFS
+               fprintf(stderr, "# after read %zu: %zu..%zu/%zu = ", nread, 
bs->pos, bs->len, bs->size);
+               for (size_t i = bs->pos; i < bs->len; i++)
+                       fprintf(stderr, " %02X", (unsigned char) bs->buf[i]);
+               fprintf(stderr, "\n");
+               #endif
+               size_t consumed;
+               msg = insert_some_nul_terminated(&state, &bs->buf[bs->pos], 
bs->len - bs->pos, &consumed);
+               #ifdef DEBUG_PRINTFS
+               fprintf(stderr, "# consumed %zu, left_over=%zu, 
batcount=%zu\n", consumed, state.left_over, BATcount(bat));
+               #endif
+               if (msg != MAL_SUCCEED)
+                       goto end;
+               bs->pos += consumed;
        }
 
-       // It's an error to have date left after falling out of the outer loop
        if (bs->pos < bs->len)
                bailout("unterminated string at end");
 
+       msg = MAL_SUCCEED;
 end:
-       *eof_reached = 0;
-       // GDKfree(buffer);
+       release_insert_state(&state);
+       ma_close(ma, &ma_state);
        if (bs != NULL) {
-               *eof_reached = (int)bs->eof;
+               *eof_reached = bs->eof;
                bs->s = NULL;
                bstream_destroy(bs);
        }
-       ma_destroy(sa);
        return msg;
 }
 
+
+
 static str
 dump_zero_terminated_text(BAT *bat, stream *s, BUN start, BUN length, bool 
byteswap)
 {
diff --git a/sql/backends/monet5/sql_bincopyconvert.h 
b/sql/backends/monet5/sql_bincopyconvert.h
--- a/sql/backends/monet5/sql_bincopyconvert.h
+++ b/sql/backends/monet5/sql_bincopyconvert.h
@@ -69,5 +69,16 @@ extern bool can_dump_binary_column(type_
 
 extern str dump_binary_column(type_record_t *rec, BAT *b, BUN start, BUN 
length, bool byteswap, stream *s);
 
+struct insert_state {
+       BAT *bat;
+       int width;
+       void *scratch;
+       size_t schratch_len;
+       size_t left_over;
+       allocator *ma;
+};
+extern void init_insert_state(struct insert_state *st, allocator *ma, BAT 
*bat, int width);
+extern void release_insert_state(struct insert_state *st);
+extern str insert_some_nul_terminated(struct insert_state *st, const char 
*data, size_t total_len, size_t *consumed);
 
 #endif
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to