Changeset: dc34cc50d1f2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/dc34cc50d1f2
Modified Files:
        sql/backends/monet5/sql_bincopyconvert.c
Branch: copyfaster
Log Message:

Add buffering to COPY BINARY blobs


diffs (242 lines):

diff --git a/sql/backends/monet5/sql_bincopyconvert.c 
b/sql/backends/monet5/sql_bincopyconvert.c
--- a/sql/backends/monet5/sql_bincopyconvert.c
+++ b/sql/backends/monet5/sql_bincopyconvert.c
@@ -631,127 +631,136 @@ end:
        return msg;
 }
 
-// Some streams, in particular the mapi upload stream, sometimes read fewer
-// bytes than requested. This function wraps the read in a loop to force it to
-// read the whole block
-static ssize_t
-read_exact(stream *s, void *buffer, size_t length)
+static str
+process_blobs(bstream *bs, BAT *bat, bool byteswap, char **scratch, size_t 
*scratch_len)
 {
-       char *p = buffer;
+       static const char mal_operator[] = "sql.importColumn";
+       const blob *nil_value = ATOMnilptr(TYPE_blob);
+       const blob empty_value = (blob) {
+               .nitems = 0,
+       };
+       uint64_t header;
+
+       // long count = 0;
+       // size_t old_pos = bs->pos;
+       // fprintf(stderr, "## start bs { .pos=%zu .len=%zu .size=%zu}\n", 
bs->pos, bs->len, bs->size);
+
+       while (true) {
+               // fprintf(stderr,"######## bs { .pos=%zu .len=%zu .size=%zu}", 
bs->pos, bs->len, bs->size);
+               // for (size_qt i = 0; i < bs->len - bs->pos && i < 8; i++) {
+               //      char *sep = (i > 0 && i % 4 == 0) ? " " : "";
+               //      fprintf(stderr, "%s %02x", sep, (unsigned 
int)bs->buf[bs->pos + i]);
+               // }
+               // fprintf(stderr, "\n");
+               size_t len = bs->len - bs->pos;
+               if (len < 8)
+                       break;
+               memcpy(&header, bs->buf + bs->pos, 8);
+               if (byteswap)
+                       header = copy_binary_byteswap64(header);
 
-       while (length > 0) {
-               ssize_t nread = mnstr_read(s, p, 1, length);
-               if (nread < 0) {
-                       return nread;
-               } else if (nread == 0) {
+               // Determine what to insert. Return MAL_SUCCEED if incomplete,
+               // outer loop will retry with more data.
+               const blob *value;
+               size_t size;
+               if (header == ~(uint64_t)0) {
+                       value = nil_value;
+                       size = 0;
+               } else if (header == 0) {
+                       value = &empty_value;
+                       size = 0;
+               } else if (len < header + 8) {
                        break;
+               } else if (header > (uint64_t)VAR_MAX) {
+                       throw(MAL, mal_operator, SQLSTATE(42000) "blob too 
long");
                } else {
-                       p += nread;
-                       length -= nread;
+                       // Copy it to scratch.
+                       // We can't use it in-place because it's probably 
misaligned.
+                       size = header;
+                       size_t needed = size + sizeof(empty_value);
+                       if (*scratch == NULL || *scratch_len < needed) {
+                               // Reallocate the buffer. Do not use realloc, 
we don't
+                               // care about the existing contents.
+                               GDKfree(*scratch);
+                               *scratch = NULL;
+                               *scratch_len = 0;
+                               size_t allocate = needed;
+                               allocate += allocate / 16;
+                               allocate += (~allocate + 1) % (1024 * 1024);
+                               *scratch = GDKmalloc(allocate);
+                               if (*scratch == NULL) {
+                                       throw(SQL, "sql", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+                               }
+                               *scratch_len = allocate;
+                       }
+                       blob *scratchblob = (blob*)*scratch;
+                       scratchblob->nitems = size;
+                       memcpy(scratchblob->data, bs->buf + bs->pos + 8, size);
+                       value = scratchblob;
                }
+
+               // Insert the blob and update the buffer position
+               // fprintf(stderr, "######## append size=%zu nil=%d\n", size, 
value == nil_value);
+               if (bunfastapp(bat, value) != GDK_SUCCEED) {
+                       throw(SQL, mal_operator, GDK_EXCEPTION);
+               }
+               bs->pos += 8 + size;
+               // count++;
        }
-
-       return p - (char*)buffer;
+       // fprintf(stderr, "## end bs { .pos=%zu .len=%zu .size=%zu}\n", 
bs->pos, bs->len, bs->size);
+       // fprintf(stderr, "## processed %zu bytes, %zu left, %ld items\n", 
bs->pos - old_pos, bs->len - bs->pos, count);
+       return MAL_SUCCEED;
 }
 
 // Read BLOBs.  Every blob is preceded by a 64bit header word indicating its 
length.
 // NULLs are indicated by length==-1
 static str
-load_blob(BAT *bat, stream *s, int *eof_reached, int width, bool byteswap)
+load_blobs(BAT *bat, stream *s, int *eof_reached, int width, bool byteswap)
 {
        (void)width;
        static const char mal_operator[] = "sql.importColumn";
-       str msg = MAL_SUCCEED;
-       const blob *nil_value = ATOMnilptr(TYPE_blob);
-       blob *buffer = NULL;
-       size_t buffer_size = 0;
-       union {
-               uint64_t length;
-               char bytes[8];
-       } header;
+       str msg;
+       bstream *bs= NULL;
+       char *scratch = NULL;
+       size_t scratch_len = 0;
 
        *eof_reached = 0;
 
-       /* we know nothing about the ordering of the input data */
-       bat->tsorted = false;
-       bat->trevsorted = false;
-       bat->tkey = false;
-       /* keep tno* properties: if they're set they remain valid when
-        * appending */
-       while (1) {
-               const blob *value;
-               // Read the header
-               ssize_t nread = read_exact(s, header.bytes, 8);
+       bs = bstream_create(s, 1024 * 1024); // TODO small value for testing, 
make it larger
+       if (bs == NULL) {
+               msg = createException(SQL, "sql", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               goto end;
+       }
+
+       // Load chunks of data into our buffer and process any complete blobs 
we find
+       while (!bs->eof) {
+               ssize_t nread = bstream_next(bs);
                if (nread < 0) {
                        bailout("%s", mnstr_peek_error(s));
                } else if (nread == 0) {
                        *eof_reached = 1;
-                       break;
-               } else if (nread < 8) {
-                       bailout("incomplete blob at end of file");
-               }
-               if (byteswap) {
-                       copy_binary_convert64(&header.length);
+                       assert(bs->eof);
                }
-
-               if (header.length == ~(uint64_t)0) {
-                       value = nil_value;
-                       bat->tnonil = false;
-                       bat->tnil = true;
-               } else {
-                       size_t length;
-                       size_t needed;
-
-                       if (header.length >= VAR_MAX) {
-                               bailout("blob too long");
-                       }
-                       length = (size_t) header.length;
-
-                       // Reallocate the buffer
-                       needed = sizeof(blob) + length;
-                       if (buffer_size < needed) {
-                               // do not use GDKrealloc, no need to copy the 
old contents
-                               GDKfree(buffer);
-                               size_t allocate = needed;
-                               allocate += allocate / 16;   // add a little 
margin
-#ifdef _MSC_VER
-#pragma warning(suppress:4146)
-#endif
-                               allocate += ((~allocate + 1) % 0x100000);   // 
round up to nearest MiB
-                               assert(allocate >= needed);
-                               buffer = GDKmalloc(allocate);
-                               if (!buffer) {
-                                       msg = createException(SQL, "sql", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
-                                       goto end;
-                               }
-                               buffer_size = allocate;
-                       }
-
-                       // Fill the buffer
-                       buffer->nitems = length;
-                       if (length > 0) {
-                               nread = read_exact(s, buffer->data, length);
-                               if (nread < 0) {
-                                       bailout("%s", mnstr_peek_error(s));
-                               } else if ((size_t)nread < length) {
-                                       bailout("Incomplete blob at end of 
file");
-                               }
-                       }
-
-                       value = buffer;
-               }
-
-               if (bunfastapp(bat, value) != GDK_SUCCEED) {
-                               msg = createException(SQL, mal_operator, 
GDK_EXCEPTION);
-                               goto end;
-               }
+               msg = process_blobs(bs, bat, byteswap, &scratch, &scratch_len);
+               if (msg != MAL_SUCCEED)
+                       goto end;
+       }
+       if (bs->pos < bs->len) {
+               bailout("incomplete blob at end");
        }
 
+       msg = MAL_SUCCEED;
 end:
-       GDKfree(buffer);
+       if (bs) {
+               *eof_reached = bs->eof;
+               bs->s = NULL;
+       }
+       bstream_destroy(bs);
+       GDKfree(scratch);
        return msg;
 }
 
+
 static str
 dump_blob(BAT *bat, stream *s, BUN start, BUN length, bool byteswap)
 {
@@ -802,7 +811,7 @@ static struct type_record_t type_recs[] 
        { "hge", "hge", .trivial_if_no_byteswap=true, .decoder=byteswap_hge, 
.encoder=byteswap_hge, .validate=validate_hge },
 #endif
 
-       { "blob", "blob", .loader=load_blob, .dumper=dump_blob },
+       { "blob", "blob", .loader=load_blobs, .dumper=dump_blob },
 
        // \0-terminated text records
        { "str", "str", .loader=load_zero_terminated_text, 
.dumper=dump_zero_terminated_text },
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to