Changeset: 0b541fd46b02 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/0b541fd46b02
Modified Files:
sql/backends/monet5/sql_bincopy.c
sql/backends/monet5/sql_bincopyconvert.c
sql/backends/monet5/sql_bincopyconvert.h
Branch: copyfaster
Log Message:
Refactor load_zero_terminated_text
Split io and string handling, later on we plan to reuse the string handling
diffs (269 lines):
diff --git a/sql/backends/monet5/sql_bincopy.c
b/sql/backends/monet5/sql_bincopy.c
--- a/sql/backends/monet5/sql_bincopy.c
+++ b/sql/backends/monet5/sql_bincopy.c
@@ -185,7 +185,6 @@ static str
load_column(type_record_t *rec, const char *name, BAT *bat, stream *s, int
width, bool byteswap, BUN rows_estimate, int *eof_reached)
{
static const char mal_operator[] = "sql.importColumn";
- BUN orig_count, new_count;
str msg = MAL_SUCCEED;
BUN rows_added;
@@ -195,12 +194,11 @@ load_column(type_record_t *rec, const ch
// sanity check
assert( (loader != NULL) + (decoder != NULL) + trivial == 1);
(void)trivial;
+ assert(BATcount(bat) == 0);
if (rec->trivial_if_no_byteswap && !byteswap)
decoder = NULL;
- orig_count = BATcount(bat);
-
if (loader) {
msg = loader(bat, s, eof_reached, width, byteswap);
} else if (decoder) {
@@ -210,8 +208,7 @@ load_column(type_record_t *rec, const ch
msg = load_trivial(bat, s, name, rec->validate, width,
rows_estimate, eof_reached);
}
- new_count = BATcount(bat);
- rows_added = new_count - orig_count;
+ rows_added = BATcount(bat);
if (rows_added > 0) {
// We don't know anything about the data we just loaded
diff --git a/sql/backends/monet5/sql_bincopyconvert.c
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -328,87 +328,157 @@ encode_timestamp(void *dst_, void *src_,
return MAL_SUCCEED;
}
-// Load NUL-terminated items from the stream and put them in the BAT.
+void init_insert_state(struct insert_state *st, allocator *ma, BAT *bat, int
width) {
+ *st = (struct insert_state) {
+ .bat = bat,
+ .width = width,
+ .scratch = NULL,
+ .schratch_len = 0,
+ .left_over = 0,
+ .ma = ma,
+ };
+};
+
+void release_insert_state(struct insert_state *st) {
+ // No longer needed because we use the .ma allocator which is managed by
+ // our caller: GDKfree(st->scratch);
+ (void)st;
+}
+
+static str
+insert_one_nul_terminated(struct insert_state *st, const char *item, size_t
item_len)
+{
+ int tpe = BATttype(st->bat);
+ const void *value;
+
+ assert(item_len > 0);
+ assert(item[item_len - 1] == '\0');
+
+ unsigned char *uitem = (unsigned char *)item;
+
+ if (uitem[0] == 0x80 && item_len == 2) {
+ value = ATOMnilptr(tpe);
+ } else {
+ if (!checkUTF8(item)) {
+ throw(SQL, "insert_nul_terminated", SQLSTATE(42000)
"malformed utf-8 byte sequence");
+ }
+ if (tpe == TYPE_str) {
+ if (st->width > 0 && UTF8_strlen(item) > st->width)
+ throw(SQL, "insert_nul_terminated", "string too
wide for column");
+ value = item;
+ } else {
+ ssize_t n = ATOMfromstr(st->ma, tpe, &st->scratch,
&st->schratch_len, item, false);
+ if (n <= 0)
+ throw(SQL, "insert_nul_terminated",
GDK_EXCEPTION);
+ value = st->scratch;
+ }
+ }
+
+ // By now 'value' has been set
+ if (bunfastapp(st->bat, value) != GDK_SUCCEED) {
+ throw(SQL, "insert_nul_terminated", GDK_EXCEPTION);
+ }
+
+ return MAL_SUCCEED;
+}
+
+str insert_some_nul_terminated(struct insert_state *st, const char *data,
size_t total_len, size_t *consumed)
+{
+ str msg;
+ assert(st->left_over <= total_len);
+
+ const char *start = data; // start of the current item
+ const char *pos = data + st->left_over; // where to start searching
+ const char *limit = data + total_len;
+
+ while (pos < limit) {
+ const char *end = memchr(pos, '\0', limit - pos);
+ if (end == NULL)
+ break;
+ size_t len = end + 1 - start;
+ msg = insert_one_nul_terminated(st, start, len);
+ if (msg != MAL_SUCCEED)
+ return msg;
+ start += len;
+ pos = start;
+ }
+
+ *consumed = start - data;
+ st->left_over = data + total_len - start;
+ return MAL_SUCCEED;
+}
+
+// #define DEBUG_PRINTFS
+
static str
load_zero_terminated_text(BAT *bat, stream *s, int *eof_reached, int width,
bool byteswap)
{
- (void)byteswap;
- static const char mal_operator[] = "sql.importColumn";
- str msg = MAL_SUCCEED;
+ str msg;
+ static const char mal_operator[] = "sql.export_bin_column";
+ const size_t min_read = 8190;
+ allocator *ma = MT_thread_getallocator();
+ allocator_state ma_state = ma_open(ma);
+ struct insert_state state;
bstream *bs = NULL;
- int tpe = BATttype(bat);
- void *buffer = NULL;
- size_t buffer_len = 0;
- allocator *sa = create_allocator(NULL, NULL, false);
- // convert_and_validate_utf8() above counts on the following property
to hold:
- assert(strNil((const char[2]){ 0x80, 0 }));
-
- bs = bstream_create(s, 1 << 20);
+ (void)byteswap; // not applicable to strings
+ init_insert_state(&state, ma, bat, width);
+ bs = bstream_create(s, 1<<20);
if (bs == NULL) {
msg = createException(SQL, "sql", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
goto end;
}
- // In the outer loop we refill the buffer until the stream ends.
- // In the inner loop we look for complete \0-terminated strings.
while (1) {
- ssize_t nread = bstream_next(bs);
+ size_t in_use = bs->len - bs->pos;
+ size_t free = bs->size - in_use;
+ size_t to_read = free >= min_read ? free : min_read;
+ #ifdef DEBUG_PRINTFS
+ fprintf(stderr, "# before read %zu: %zu..%zu/%zu = ", to_read,
bs->pos, bs->len, bs->size);
+ for (size_t i = bs->pos; i < bs->len; i++)
+ fprintf(stderr, " %02X", (unsigned char) bs->buf[i]);
+ fprintf(stderr, "\n");
+ #endif
+ ssize_t nread = bstream_read(bs, to_read);
if (nread < 0)
bailout("%s", mnstr_peek_error(s));
- if (nread == 0)
+ else if (nread == 0) {
+ assert(bs->eof);
break;
-
- char *buf_start = &bs->buf[bs->pos];
- char *buf_end = &bs->buf[bs->len];
- char *start, *end;
- for (start = buf_start; (end = memchr(start, '\0', buf_end -
start)) != NULL; start = end + 1) {
- char *value;
- if (!checkUTF8(start)) {
- msg = createException(SQL,
"load_zero_terminated_text", SQLSTATE(42000) "malformed utf-8 byte sequence");
- goto end;
- }
- if (tpe == TYPE_str) {
- if (width > 0 && !strNil(start)) {
- int w = UTF8_strlen(start);
- if (w > width) {
- msg = createException(SQL,
"sql.importColumn", "string too wide for column");
- goto end;
- }
- }
- value = start;
- } else {
- ssize_t n = ATOMfromstr(sa, tpe, &buffer,
&buffer_len, start, false);
- if (n <= 0) {
- msg = createException(SQL,
"sql.importColumn", GDK_EXCEPTION);
- goto end;
- }
- value = buffer;
- }
- if (bunfastapp(bat, value) != GDK_SUCCEED) {
- msg = createException(SQL, "sql.importColumn",
GDK_EXCEPTION);
- goto end;
- }
}
- bs->pos = start - buf_start;
+ #ifdef DEBUG_PRINTFS
+ fprintf(stderr, "# after read %zu: %zu..%zu/%zu = ", nread,
bs->pos, bs->len, bs->size);
+ for (size_t i = bs->pos; i < bs->len; i++)
+ fprintf(stderr, " %02X", (unsigned char) bs->buf[i]);
+ fprintf(stderr, "\n");
+ #endif
+ size_t consumed;
+ msg = insert_some_nul_terminated(&state, &bs->buf[bs->pos],
bs->len - bs->pos, &consumed);
+ #ifdef DEBUG_PRINTFS
+ fprintf(stderr, "# consumed %zu, left_over=%zu,
batcount=%zu\n", consumed, state.left_over, BATcount(bat));
+ #endif
+ if (msg != MAL_SUCCEED)
+ goto end;
+ bs->pos += consumed;
}
- // It's an error to have date left after falling out of the outer loop
if (bs->pos < bs->len)
bailout("unterminated string at end");
+ msg = MAL_SUCCEED;
end:
- *eof_reached = 0;
- // GDKfree(buffer);
+ release_insert_state(&state);
+ ma_close(ma, &ma_state);
if (bs != NULL) {
- *eof_reached = (int)bs->eof;
+ *eof_reached = bs->eof;
bs->s = NULL;
bstream_destroy(bs);
}
- ma_destroy(sa);
return msg;
}
+
+
static str
dump_zero_terminated_text(BAT *bat, stream *s, BUN start, BUN length, bool
byteswap)
{
diff --git a/sql/backends/monet5/sql_bincopyconvert.h
b/sql/backends/monet5/sql_bincopyconvert.h
--- a/sql/backends/monet5/sql_bincopyconvert.h
+++ b/sql/backends/monet5/sql_bincopyconvert.h
@@ -69,5 +69,16 @@ extern bool can_dump_binary_column(type_
extern str dump_binary_column(type_record_t *rec, BAT *b, BUN start, BUN
length, bool byteswap, stream *s);
+struct insert_state {
+ BAT *bat;
+ int width;
+ void *scratch;
+ size_t schratch_len;
+ size_t left_over;
+ allocator *ma;
+};
+extern void init_insert_state(struct insert_state *st, allocator *ma, BAT
*bat, int width);
+extern void release_insert_state(struct insert_state *st);
+extern str insert_some_nul_terminated(struct insert_state *st, const char
*data, size_t total_len, size_t *consumed);
#endif
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]