Changeset: dc34cc50d1f2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/dc34cc50d1f2
Modified Files:
sql/backends/monet5/sql_bincopyconvert.c
Branch: copyfaster
Log Message:
Add buffering to COPY BINARY blobs
diffs (242 lines):
diff --git a/sql/backends/monet5/sql_bincopyconvert.c
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -631,127 +631,136 @@ end:
return msg;
}
-// Some streams, in particular the mapi upload stream, sometimes read fewer
-// bytes than requested. This function wraps the read in a loop to force it to
-// read the whole block
-static ssize_t
-read_exact(stream *s, void *buffer, size_t length)
+static str
+process_blobs(bstream *bs, BAT *bat, bool byteswap, char **scratch, size_t
*scratch_len)
{
- char *p = buffer;
+ static const char mal_operator[] = "sql.importColumn";
+ const blob *nil_value = ATOMnilptr(TYPE_blob);
+ const blob empty_value = (blob) {
+ .nitems = 0,
+ };
+ uint64_t header;
+
+ // long count = 0;
+ // size_t old_pos = bs->pos;
+ // fprintf(stderr, "## start bs { .pos=%zu .len=%zu .size=%zu}\n",
bs->pos, bs->len, bs->size);
+
+ while (true) {
+ // fprintf(stderr,"######## bs { .pos=%zu .len=%zu .size=%zu}",
bs->pos, bs->len, bs->size);
+ // for (size_qt i = 0; i < bs->len - bs->pos && i < 8; i++) {
+ // char *sep = (i > 0 && i % 4 == 0) ? " " : "";
+ // fprintf(stderr, "%s %02x", sep, (unsigned
int)bs->buf[bs->pos + i]);
+ // }
+ // fprintf(stderr, "\n");
+ size_t len = bs->len - bs->pos;
+ if (len < 8)
+ break;
+ memcpy(&header, bs->buf + bs->pos, 8);
+ if (byteswap)
+ header = copy_binary_byteswap64(header);
- while (length > 0) {
- ssize_t nread = mnstr_read(s, p, 1, length);
- if (nread < 0) {
- return nread;
- } else if (nread == 0) {
+ // Determine what to insert. Return MAL_SUCCEED if incomplete,
+ // outer loop will retry with more data.
+ const blob *value;
+ size_t size;
+ if (header == ~(uint64_t)0) {
+ value = nil_value;
+ size = 0;
+ } else if (header == 0) {
+ value = &empty_value;
+ size = 0;
+ } else if (len < header + 8) {
break;
+ } else if (header > (uint64_t)VAR_MAX) {
+ throw(MAL, mal_operator, SQLSTATE(42000) "blob too
long");
} else {
- p += nread;
- length -= nread;
+ // Copy it to scratch.
+ // We can't use it in-place because it's probably
misaligned.
+ size = header;
+ size_t needed = size + sizeof(empty_value);
+ if (*scratch == NULL || *scratch_len < needed) {
+ // Reallocate the buffer. Do not use realloc,
we don't
+ // care about the existing contents.
+ GDKfree(*scratch);
+ *scratch = NULL;
+ *scratch_len = 0;
+ size_t allocate = needed;
+ allocate += allocate / 16;
+ allocate += (~allocate + 1) % (1024 * 1024);
+ *scratch = GDKmalloc(allocate);
+ if (*scratch == NULL) {
+ throw(SQL, "sql", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
+ }
+ *scratch_len = allocate;
+ }
+ blob *scratchblob = (blob*)*scratch;
+ scratchblob->nitems = size;
+ memcpy(scratchblob->data, bs->buf + bs->pos + 8, size);
+ value = scratchblob;
}
+
+ // Insert the blob and update the buffer position
+ // fprintf(stderr, "######## append size=%zu nil=%d\n", size,
value == nil_value);
+ if (bunfastapp(bat, value) != GDK_SUCCEED) {
+ throw(SQL, mal_operator, GDK_EXCEPTION);
+ }
+ bs->pos += 8 + size;
+ // count++;
}
-
- return p - (char*)buffer;
+ // fprintf(stderr, "## end bs { .pos=%zu .len=%zu .size=%zu}\n",
bs->pos, bs->len, bs->size);
+ // fprintf(stderr, "## processed %zu bytes, %zu left, %ld items\n",
bs->pos - old_pos, bs->len - bs->pos, count);
+ return MAL_SUCCEED;
}
// Read BLOBs. Every blob is preceded by a 64bit header word indicating its
length.
// NULLs are indicated by length==-1
static str
-load_blob(BAT *bat, stream *s, int *eof_reached, int width, bool byteswap)
+load_blobs(BAT *bat, stream *s, int *eof_reached, int width, bool byteswap)
{
(void)width;
static const char mal_operator[] = "sql.importColumn";
- str msg = MAL_SUCCEED;
- const blob *nil_value = ATOMnilptr(TYPE_blob);
- blob *buffer = NULL;
- size_t buffer_size = 0;
- union {
- uint64_t length;
- char bytes[8];
- } header;
+ str msg;
+ bstream *bs= NULL;
+ char *scratch = NULL;
+ size_t scratch_len = 0;
*eof_reached = 0;
- /* we know nothing about the ordering of the input data */
- bat->tsorted = false;
- bat->trevsorted = false;
- bat->tkey = false;
- /* keep tno* properties: if they're set they remain valid when
- * appending */
- while (1) {
- const blob *value;
- // Read the header
- ssize_t nread = read_exact(s, header.bytes, 8);
+ bs = bstream_create(s, 1024 * 1024); // TODO small value for testing,
make it larger
+ if (bs == NULL) {
+ msg = createException(SQL, "sql", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
+ goto end;
+ }
+
+ // Load chunks of data into our buffer and process any complete blobs
we find
+ while (!bs->eof) {
+ ssize_t nread = bstream_next(bs);
if (nread < 0) {
bailout("%s", mnstr_peek_error(s));
} else if (nread == 0) {
*eof_reached = 1;
- break;
- } else if (nread < 8) {
- bailout("incomplete blob at end of file");
- }
- if (byteswap) {
- copy_binary_convert64(&header.length);
+ assert(bs->eof);
}
-
- if (header.length == ~(uint64_t)0) {
- value = nil_value;
- bat->tnonil = false;
- bat->tnil = true;
- } else {
- size_t length;
- size_t needed;
-
- if (header.length >= VAR_MAX) {
- bailout("blob too long");
- }
- length = (size_t) header.length;
-
- // Reallocate the buffer
- needed = sizeof(blob) + length;
- if (buffer_size < needed) {
- // do not use GDKrealloc, no need to copy the
old contents
- GDKfree(buffer);
- size_t allocate = needed;
- allocate += allocate / 16; // add a little
margin
-#ifdef _MSC_VER
-#pragma warning(suppress:4146)
-#endif
- allocate += ((~allocate + 1) % 0x100000); //
round up to nearest MiB
- assert(allocate >= needed);
- buffer = GDKmalloc(allocate);
- if (!buffer) {
- msg = createException(SQL, "sql",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
- goto end;
- }
- buffer_size = allocate;
- }
-
- // Fill the buffer
- buffer->nitems = length;
- if (length > 0) {
- nread = read_exact(s, buffer->data, length);
- if (nread < 0) {
- bailout("%s", mnstr_peek_error(s));
- } else if ((size_t)nread < length) {
- bailout("Incomplete blob at end of
file");
- }
- }
-
- value = buffer;
- }
-
- if (bunfastapp(bat, value) != GDK_SUCCEED) {
- msg = createException(SQL, mal_operator,
GDK_EXCEPTION);
- goto end;
- }
+ msg = process_blobs(bs, bat, byteswap, &scratch, &scratch_len);
+ if (msg != MAL_SUCCEED)
+ goto end;
+ }
+ if (bs->pos < bs->len) {
+ bailout("incomplete blob at end");
}
+ msg = MAL_SUCCEED;
end:
- GDKfree(buffer);
+ if (bs) {
+ *eof_reached = bs->eof;
+ bs->s = NULL;
+ }
+ bstream_destroy(bs);
+ GDKfree(scratch);
return msg;
}
+
static str
dump_blob(BAT *bat, stream *s, BUN start, BUN length, bool byteswap)
{
@@ -802,7 +811,7 @@ static struct type_record_t type_recs[]
{ "hge", "hge", .trivial_if_no_byteswap=true, .decoder=byteswap_hge,
.encoder=byteswap_hge, .validate=validate_hge },
#endif
- { "blob", "blob", .loader=load_blob, .dumper=dump_blob },
+ { "blob", "blob", .loader=load_blobs, .dumper=dump_blob },
// \0-terminated text records
{ "str", "str", .loader=load_zero_terminated_text,
.dumper=dump_zero_terminated_text },
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]