Changeset: 8adcd9017410 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8adcd9017410
Modified Files:
        monetdb5/modules/atoms/mtime.c
        monetdb5/modules/atoms/mtime.h
        sql/backends/monet5/sql_result.c
        sql/backends/monet5/sql_result.h
Branch: protocol
Log Message:

Correctly support chunk export in new protocol.


diffs (truncated from 1120 to 300 lines):

diff --git a/monetdb5/modules/atoms/mtime.c b/monetdb5/modules/atoms/mtime.c
--- a/monetdb5/modules/atoms/mtime.c
+++ b/monetdb5/modules/atoms/mtime.c
@@ -2659,6 +2659,24 @@ MTIMEepoch2int(int *ret, const timestamp
 }
 
 str
+MTIMEepoch2lng(lng *ret, const timestamp *t)
+{
+       timestamp e;
+       lng v;
+       str err;
+
+       if ((err = MTIMEunix_epoch(&e)) != MAL_SUCCEED)
+               return err;
+       if ((err = MTIMEtimestamp_diff(&v, t, &e)) != MAL_SUCCEED)
+               return err;
+       if (v == lng_nil)
+               *ret = int_nil;
+       else
+               *ret = v;
+       return MAL_SUCCEED;
+}
+
+str
 MTIMEepoch_bulk(bat *ret, bat *bid)
 {
        timestamp epoch;
diff --git a/monetdb5/modules/atoms/mtime.h b/monetdb5/modules/atoms/mtime.h
--- a/monetdb5/modules/atoms/mtime.h
+++ b/monetdb5/modules/atoms/mtime.h
@@ -164,6 +164,7 @@ mal_export str MTIMEdaytime_diff(lng *re
 mal_export str MTIMEtimestamp_diff(lng *ret, const timestamp *v1, const 
timestamp *v2);
 mal_export str MTIMEtimestamp_diff_bulk(bat *ret, const bat *bid1, const bat 
*bid2);
 mal_export str MTIMEtimestamp_inside_dst(bit *ret, const timestamp *p, const 
tzone *z);
+mal_export str MTIMEepoch2lng(lng *res, const timestamp *ts);
 mal_export str MTIMEepoch_bulk(bat *ret, bat *bid);
 
 mal_export str MTIMEtimestamp_year(int *ret, const timestamp *t);
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -1298,6 +1298,321 @@ mvc_export_row(backend *b, stream *s, re
        return (ok) ? 0 : -1;
 }
 
+static int type_supports_binary_transfer(sql_type *type) {
+       return 
+               type->eclass == EC_BIT ||
+               type->eclass == EC_POS ||
+               type->eclass == EC_CHAR || 
+               type->eclass == EC_STRING ||
+               type->eclass == EC_DEC || 
+               type->eclass == EC_BLOB ||
+               type->eclass == EC_FLT || 
+               type->eclass == EC_NUM || 
+//             type->eclass == EC_DATE || 
+               type->eclass == EC_TIME || 
+               type->eclass == EC_SEC ||
+               type->eclass == EC_MONTH || 
+               type->eclass == EC_TIMESTAMP;
+}
+
+
+static int write_str_term(stream* s, const char* const val) {
+       return mnstr_writeStr(s, val) && mnstr_writeBte(s, 0);
+}
+
+// align to 8 bytes
+static char* 
+eight_byte_align(char* ptr) {
+       return (char*) (((size_t) ptr + 7) & ~7);
+}
+
+static int
+mvc_export_table_prot10(backend *b, stream *s, res_table *t, BAT *order, BUN 
offset, BUN nr) {
+       lng count = 0;
+       size_t row = 0;
+       size_t srow = 0;
+       size_t varsized = 0;
+       size_t length_prefixed = 0;
+       lng fixed_lengths = 0;
+       int fres = 0;
+       size_t i = 0;
+       size_t bsize = b->client->blocksize;
+       BATiter *iterators = NULL;
+       char *result = NULL;
+       int length = 0;
+
+       (void) order; // FIXME: respect explicitly ordered output
+
+       iterators = GDKzalloc(sizeof(BATiter) * t->nr_cols);
+       if (!iterators) {
+               fres = -1;
+               goto cleanup;
+       }
+
+       // ensure the buffer is currently empty
+       if (bs2_buffer(s).pos != 0) {
+               // clear the buffer
+               if (mnstr_flush(s) < 0) {
+                       fres = -1;
+                       goto cleanup;
+               }
+       }
+
+       // inspect all the columns to figure out how many bytes it takes to 
transfer one row
+       for (i = 0; i < (size_t) t->nr_cols; i++) {
+               res_col *c = t->cols + i;
+               BAT *b = BATdescriptor(c->b);
+               int mtype = b->ttype;
+               int typelen = ATOMsize(mtype);
+               int convert_to_string = 
!type_supports_binary_transfer(c->type.type) && b->ttype != TYPE_json;
+               sql_type *type = c->type.type;
+
+               iterators[i] = bat_iterator(b);
+               
+               if (type->eclass == EC_TIMESTAMP) {
+                       // timestamps are converted to Unix Timestamps
+                       mtype = TYPE_lng;
+                       typelen = sizeof(lng);  
+               }
+               if (ATOMvarsized(mtype) || convert_to_string) {
+                       typelen = -1;
+                       varsized++;
+                       length_prefixed++;
+               } else {
+                       fixed_lengths += typelen;
+               }
+       }
+
+
+       // now perform the actual transfer
+       row = srow = offset;
+       count = nr;
+       while (row < (size_t) count) {
+               char *buf = bs2_buffer(s).buf;
+               size_t crow = 0;
+               size_t bytes_left = bsize - sizeof(lng) - 2 * sizeof(char) - 1;
+               // potential padding that has to be added for each column
+               bytes_left -= t->nr_cols * 7;
+
+               // every varsized member has an 8-byte header indicating the 
length of the header in the block
+               // subtract this from the amount of bytes left
+               bytes_left -= length_prefixed * sizeof(lng);
+
+               if (varsized == 0) {
+                       // no varsized elements, so we can immediately compute 
the amount of elements
+                       if (fixed_lengths == 0) {
+                               row = (size_t) count;
+                       } else {
+                               row = (size_t) (srow + bytes_left / 
fixed_lengths);
+                               row = row > (size_t) count ? (size_t) count : 
row;
+                       }
+               } else {
+                       size_t rowsize = 0;
+                       // we have varsized elements, so we have to loop to 
determine how many rows fit into a buffer
+                       while (row < (size_t) count) {
+                               rowsize = (size_t) fixed_lengths;
+                               for (i = 0; i < (size_t) t->nr_cols; i++) {
+                                       res_col *c = t->cols + i;
+                                       int mtype = iterators[i].b->ttype;
+                                       int convert_to_string = 
!type_supports_binary_transfer(c->type.type);
+                                       if (convert_to_string || 
ATOMvarsized(mtype)) {
+                                               if (c->type.type->eclass == 
EC_BLOB) {
+                                                       blob *b = (blob*) 
BUNtail(iterators[i], row);
+                                                       rowsize += sizeof(lng) 
+ ((b->nitems == ~(size_t) 0) ? 0 : b->nitems);
+                                               } else {
+                                                       size_t slen = 
strlen((const char*) BUNtail(iterators[i], row));
+                                                       rowsize += slen + 1;
+                                               }
+                                       }
+                               }
+                               if (bytes_left < rowsize) {
+                                       break;
+                               }
+                               bytes_left -= rowsize;
+                               row++;
+                       }
+                       if (row == srow) {
+                               lng new_size = rowsize + 1024;
+                               if (!mnstr_writeLng(s, (lng) -1) || 
+                                       !mnstr_writeLng(s, new_size) || 
+                                       mnstr_flush(s) < 0) {
+                                       fres = -1;
+                                       goto cleanup;
+                               }
+                               row = srow + 1;
+                               if (bs2_resizebuf(s, (size_t) new_size) < 0) {
+                                       // failed to resize stream buffer
+                                       fres = -1;
+                                       goto cleanup;
+                               }
+                               buf = bs2_buffer(s).buf;
+                               bsize = (size_t) new_size;
+                       }
+               }
+
+               // have to transfer at least one row
+               assert(row > srow);
+               // buffer has to be empty currently
+               assert(bs2_buffer(s).pos == 0);
+
+               // continuation message
+               char* message_header = "+\n";
+               if (row >= (size_t) count) {
+                       // final message
+                       message_header = "-\n";
+               }
+               if (!mnstr_writeStr(s, message_header) || !mnstr_writeLng(s, 
(lng)(row - srow))) {
+                       fres = -1;
+                       goto cleanup;
+               }
+               buf += sizeof(lng) + 2 * sizeof(char);
+
+               for (i = 0; i < (size_t) t->nr_cols; i++) {
+                       res_col *c = t->cols + i;
+                       int mtype = iterators[i].b->ttype;
+                       int convert_to_string = 
!type_supports_binary_transfer(c->type.type);
+                       buf = eight_byte_align(buf);
+                       if (ATOMvarsized(mtype) || convert_to_string) {
+                               if (c->type.type->eclass == EC_BLOB) {
+                                       // transfer blobs as [lng][data] 
combination
+                                       char *startbuf = buf;
+                                       buf += sizeof(lng);
+                                       for (crow = srow; crow < row; crow++) {
+                                               blob *b = (blob*) 
BUNtail(iterators[i], crow);
+                                               if (b->nitems == ~(size_t) 0) {
+                                                       (*(lng*)buf) = 
mnstr_swap_lng(s, -1);
+                                                       buf += sizeof(lng);
+                                               } else {
+                                                       (*(lng*)buf) = 
mnstr_swap_lng(s, (lng) b->nitems);
+                                                       buf += sizeof(lng);
+                                                       memcpy(buf, b->data, 
b->nitems);
+                                                       buf += b->nitems;
+                                               }
+                                       }
+                                       // after the loop we know the size of 
the column, so write it
+                                       *((lng*)startbuf) = mnstr_swap_lng(s, 
buf - (startbuf + sizeof(lng)));
+                               } else {
+                                       // for variable length strings and 
large fixed strings we use varints
+                                       // variable columns are prefixed by a 
length, 
+                                       // but since we don't know the length 
yet, just skip over it for now
+                                       char *startbuf = buf;
+                                       buf += sizeof(lng);
+                                       for (crow = srow; crow < row; crow++) {
+                                               void *element = (void*) 
BUNtail(iterators[i], crow);
+                                               const char* str;
+                                               if (convert_to_string) {
+                                                       if 
(BATatoms[mtype].atomCmp(element, BATatoms[mtype].atomNull) == 0) {
+                                                               str = str_nil;
+                                                       } else {
+                                                               if 
(BATatoms[mtype].atomToStr(&result, &length, element) == 0) {
+                                                                       fres = 
-1;
+                                                                       goto 
cleanup;
+                                                               }
+                                                               // string 
conversion functions add quotes for the old protocol
+                                                               // because 
obviously adding quotes in the string conversion function
+                                                               // makes total 
sense, rather than adding the quotes in the protocol
+                                                               // thus because 
of this totally, 100% sensical implementation
+                                                               // we remove 
the quotes again here
+                                                               if (result[0] 
== '"') {
+                                                                       
result[strlen(result) - 1] = '\0';
+                                                                       str = 
result + 1;
+                                                               } else {
+                                                                       str = 
result;
+                                                               }
+                                                       }
+                                               } else {
+                                                       str = (char*) element;
+                                               }
+                                               buf = mystpcpy(buf, str) + 1;
+                                               assert(buf - bs2_buffer(s).buf 
<= (lng) bsize);
+                                       }
+                                       *((lng*)startbuf) = mnstr_swap_lng(s, 
buf - (startbuf + sizeof(lng)));
+                               }
+                       } else {
+                               int atom_size = ATOMsize(mtype);
+                               if (c->type.type->eclass == EC_DEC) {
+                                       atom_size = 
ATOMsize(ATOMstorage(mtype));
+                               }
+                               if (c->type.type->eclass == EC_TIMESTAMP) {
+                                       atom_size = sizeof(lng);
+                                       // convert timestamp values to epoch
+                                       lng time;
+                                       size_t j = 0;
+                                       bool swap = mnstr_byteorder(s) != 1234;
+                                       timestamp *times = (timestamp*) 
Tloc(iterators[i].b, srow);
+                                       lng *bufptr = (lng*) buf;
+                                       for(j = 0; j < (row - srow); j++) {
+                                               MTIMEepoch2lng(&time, times + 
j);
+                                               bufptr[j] = swap ? 
long_long_SWAP(time) : time;
+                                       }
+                               } else {
+                                       if (mnstr_byteorder(s) != 1234) {
+                                               size_t j = 0;
+                                               switch(ATOMstorage(mtype)) {
+                                                       case TYPE_sht: {
+                                                               short *bufptr = 
(short*) buf;
+                                                               short 
*exported_values = (short*) Tloc(iterators[i].b, srow);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to