Changeset: 8adcd9017410 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8adcd9017410
Modified Files:
monetdb5/modules/atoms/mtime.c
monetdb5/modules/atoms/mtime.h
sql/backends/monet5/sql_result.c
sql/backends/monet5/sql_result.h
Branch: protocol
Log Message:
Correctly support chunk export in new protocol.
diffs (truncated from 1120 to 300 lines):
diff --git a/monetdb5/modules/atoms/mtime.c b/monetdb5/modules/atoms/mtime.c
--- a/monetdb5/modules/atoms/mtime.c
+++ b/monetdb5/modules/atoms/mtime.c
@@ -2659,6 +2659,24 @@ MTIMEepoch2int(int *ret, const timestamp
}
str
+MTIMEepoch2lng(lng *ret, const timestamp *t)
+{
+ timestamp e;
+ lng v;
+ str err;
+
+ if ((err = MTIMEunix_epoch(&e)) != MAL_SUCCEED)
+ return err;
+ if ((err = MTIMEtimestamp_diff(&v, t, &e)) != MAL_SUCCEED)
+ return err;
+ if (v == lng_nil)
+ *ret = int_nil;
+ else
+ *ret = v;
+ return MAL_SUCCEED;
+}
+
+str
MTIMEepoch_bulk(bat *ret, bat *bid)
{
timestamp epoch;
diff --git a/monetdb5/modules/atoms/mtime.h b/monetdb5/modules/atoms/mtime.h
--- a/monetdb5/modules/atoms/mtime.h
+++ b/monetdb5/modules/atoms/mtime.h
@@ -164,6 +164,7 @@ mal_export str MTIMEdaytime_diff(lng *re
mal_export str MTIMEtimestamp_diff(lng *ret, const timestamp *v1, const
timestamp *v2);
mal_export str MTIMEtimestamp_diff_bulk(bat *ret, const bat *bid1, const bat
*bid2);
mal_export str MTIMEtimestamp_inside_dst(bit *ret, const timestamp *p, const
tzone *z);
+mal_export str MTIMEepoch2lng(lng *res, const timestamp *ts);
mal_export str MTIMEepoch_bulk(bat *ret, bat *bid);
mal_export str MTIMEtimestamp_year(int *ret, const timestamp *t);
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -1298,6 +1298,321 @@ mvc_export_row(backend *b, stream *s, re
return (ok) ? 0 : -1;
}
+static int type_supports_binary_transfer(sql_type *type) {
+ return
+ type->eclass == EC_BIT ||
+ type->eclass == EC_POS ||
+ type->eclass == EC_CHAR ||
+ type->eclass == EC_STRING ||
+ type->eclass == EC_DEC ||
+ type->eclass == EC_BLOB ||
+ type->eclass == EC_FLT ||
+ type->eclass == EC_NUM ||
+// type->eclass == EC_DATE ||
+ type->eclass == EC_TIME ||
+ type->eclass == EC_SEC ||
+ type->eclass == EC_MONTH ||
+ type->eclass == EC_TIMESTAMP;
+}
+
+
+static int write_str_term(stream* s, const char* const val) {
+ return mnstr_writeStr(s, val) && mnstr_writeBte(s, 0);
+}
+
+// align to 8 bytes
+static char*
+eight_byte_align(char* ptr) {
+ return (char*) (((size_t) ptr + 7) & ~7);
+}
+
+static int
+mvc_export_table_prot10(backend *b, stream *s, res_table *t, BAT *order, BUN
offset, BUN nr) {
+ lng count = 0;
+ size_t row = 0;
+ size_t srow = 0;
+ size_t varsized = 0;
+ size_t length_prefixed = 0;
+ lng fixed_lengths = 0;
+ int fres = 0;
+ size_t i = 0;
+ size_t bsize = b->client->blocksize;
+ BATiter *iterators = NULL;
+ char *result = NULL;
+ int length = 0;
+
+ (void) order; // FIXME: respect explicitly ordered output
+
+ iterators = GDKzalloc(sizeof(BATiter) * t->nr_cols);
+ if (!iterators) {
+ fres = -1;
+ goto cleanup;
+ }
+
+ // ensure the buffer is currently empty
+ if (bs2_buffer(s).pos != 0) {
+ // clear the buffer
+ if (mnstr_flush(s) < 0) {
+ fres = -1;
+ goto cleanup;
+ }
+ }
+
+ // inspect all the columns to figure out how many bytes it takes to
transfer one row
+ for (i = 0; i < (size_t) t->nr_cols; i++) {
+ res_col *c = t->cols + i;
+ BAT *b = BATdescriptor(c->b);
+ int mtype = b->ttype;
+ int typelen = ATOMsize(mtype);
+ int convert_to_string =
!type_supports_binary_transfer(c->type.type) && b->ttype != TYPE_json;
+ sql_type *type = c->type.type;
+
+ iterators[i] = bat_iterator(b);
+
+ if (type->eclass == EC_TIMESTAMP) {
+ // timestamps are converted to Unix Timestamps
+ mtype = TYPE_lng;
+ typelen = sizeof(lng);
+ }
+ if (ATOMvarsized(mtype) || convert_to_string) {
+ typelen = -1;
+ varsized++;
+ length_prefixed++;
+ } else {
+ fixed_lengths += typelen;
+ }
+ }
+
+
+ // now perform the actual transfer
+ row = srow = offset;
+ count = nr;
+ while (row < (size_t) count) {
+ char *buf = bs2_buffer(s).buf;
+ size_t crow = 0;
+ size_t bytes_left = bsize - sizeof(lng) - 2 * sizeof(char) - 1;
+ // potential padding that has to be added for each column
+ bytes_left -= t->nr_cols * 7;
+
+ // every varsized member has an 8-byte header indicating the
length of the header in the block
+ // subtract this from the amount of bytes left
+ bytes_left -= length_prefixed * sizeof(lng);
+
+ if (varsized == 0) {
+ // no varsized elements, so we can immediately compute
the amount of elements
+ if (fixed_lengths == 0) {
+ row = (size_t) count;
+ } else {
+ row = (size_t) (srow + bytes_left /
fixed_lengths);
+ row = row > (size_t) count ? (size_t) count :
row;
+ }
+ } else {
+ size_t rowsize = 0;
+ // we have varsized elements, so we have to loop to
determine how many rows fit into a buffer
+ while (row < (size_t) count) {
+ rowsize = (size_t) fixed_lengths;
+ for (i = 0; i < (size_t) t->nr_cols; i++) {
+ res_col *c = t->cols + i;
+ int mtype = iterators[i].b->ttype;
+ int convert_to_string =
!type_supports_binary_transfer(c->type.type);
+ if (convert_to_string ||
ATOMvarsized(mtype)) {
+ if (c->type.type->eclass ==
EC_BLOB) {
+ blob *b = (blob*)
BUNtail(iterators[i], row);
+ rowsize += sizeof(lng)
+ ((b->nitems == ~(size_t) 0) ? 0 : b->nitems);
+ } else {
+ size_t slen =
strlen((const char*) BUNtail(iterators[i], row));
+ rowsize += slen + 1;
+ }
+ }
+ }
+ if (bytes_left < rowsize) {
+ break;
+ }
+ bytes_left -= rowsize;
+ row++;
+ }
+ if (row == srow) {
+ lng new_size = rowsize + 1024;
+ if (!mnstr_writeLng(s, (lng) -1) ||
+ !mnstr_writeLng(s, new_size) ||
+ mnstr_flush(s) < 0) {
+ fres = -1;
+ goto cleanup;
+ }
+ row = srow + 1;
+ if (bs2_resizebuf(s, (size_t) new_size) < 0) {
+ // failed to resize stream buffer
+ fres = -1;
+ goto cleanup;
+ }
+ buf = bs2_buffer(s).buf;
+ bsize = (size_t) new_size;
+ }
+ }
+
+ // have to transfer at least one row
+ assert(row > srow);
+ // buffer has to be empty currently
+ assert(bs2_buffer(s).pos == 0);
+
+ // continuation message
+ char* message_header = "+\n";
+ if (row >= (size_t) count) {
+ // final message
+ message_header = "-\n";
+ }
+ if (!mnstr_writeStr(s, message_header) || !mnstr_writeLng(s,
(lng)(row - srow))) {
+ fres = -1;
+ goto cleanup;
+ }
+ buf += sizeof(lng) + 2 * sizeof(char);
+
+ for (i = 0; i < (size_t) t->nr_cols; i++) {
+ res_col *c = t->cols + i;
+ int mtype = iterators[i].b->ttype;
+ int convert_to_string =
!type_supports_binary_transfer(c->type.type);
+ buf = eight_byte_align(buf);
+ if (ATOMvarsized(mtype) || convert_to_string) {
+ if (c->type.type->eclass == EC_BLOB) {
+ // transfer blobs as [lng][data]
combination
+ char *startbuf = buf;
+ buf += sizeof(lng);
+ for (crow = srow; crow < row; crow++) {
+ blob *b = (blob*)
BUNtail(iterators[i], crow);
+ if (b->nitems == ~(size_t) 0) {
+ (*(lng*)buf) =
mnstr_swap_lng(s, -1);
+ buf += sizeof(lng);
+ } else {
+ (*(lng*)buf) =
mnstr_swap_lng(s, (lng) b->nitems);
+ buf += sizeof(lng);
+ memcpy(buf, b->data,
b->nitems);
+ buf += b->nitems;
+ }
+ }
+ // after the loop we know the size of
the column, so write it
+ *((lng*)startbuf) = mnstr_swap_lng(s,
buf - (startbuf + sizeof(lng)));
+ } else {
+ // for variable length strings and
large fixed strings we use varints
+ // variable columns are prefixed by a
length,
+ // but since we don't know the length
yet, just skip over it for now
+ char *startbuf = buf;
+ buf += sizeof(lng);
+ for (crow = srow; crow < row; crow++) {
+ void *element = (void*)
BUNtail(iterators[i], crow);
+ const char* str;
+ if (convert_to_string) {
+ if
(BATatoms[mtype].atomCmp(element, BATatoms[mtype].atomNull) == 0) {
+ str = str_nil;
+ } else {
+ if
(BATatoms[mtype].atomToStr(&result, &length, element) == 0) {
+ fres =
-1;
+ goto
cleanup;
+ }
+ // string
conversion functions add quotes for the old protocol
+ // because
obviously adding quotes in the string conversion function
+ // makes total
sense, rather than adding the quotes in the protocol
+ // thus because
of this totally, 100% sensical implementation
+ // we remove
the quotes again here
+ if (result[0]
== '"') {
+
result[strlen(result) - 1] = '\0';
+ str =
result + 1;
+ } else {
+ str =
result;
+ }
+ }
+ } else {
+ str = (char*) element;
+ }
+ buf = mystpcpy(buf, str) + 1;
+ assert(buf - bs2_buffer(s).buf
<= (lng) bsize);
+ }
+ *((lng*)startbuf) = mnstr_swap_lng(s,
buf - (startbuf + sizeof(lng)));
+ }
+ } else {
+ int atom_size = ATOMsize(mtype);
+ if (c->type.type->eclass == EC_DEC) {
+ atom_size =
ATOMsize(ATOMstorage(mtype));
+ }
+ if (c->type.type->eclass == EC_TIMESTAMP) {
+ atom_size = sizeof(lng);
+ // convert timestamp values to epoch
+ lng time;
+ size_t j = 0;
+ bool swap = mnstr_byteorder(s) != 1234;
+ timestamp *times = (timestamp*)
Tloc(iterators[i].b, srow);
+ lng *bufptr = (lng*) buf;
+ for(j = 0; j < (row - srow); j++) {
+ MTIMEepoch2lng(&time, times +
j);
+ bufptr[j] = swap ?
long_long_SWAP(time) : time;
+ }
+ } else {
+ if (mnstr_byteorder(s) != 1234) {
+ size_t j = 0;
+ switch(ATOMstorage(mtype)) {
+ case TYPE_sht: {
+ short *bufptr =
(short*) buf;
+ short
*exported_values = (short*) Tloc(iterators[i].b, srow);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list