Changeset: b469865d631c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b469865d631c
Modified Files:
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

Fixed issues with splitting the resultset into multiple protocol messages.


diffs (truncated from 338 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5454,48 +5454,80 @@ mapi_fetch_row(MapiHdl hdl)
        struct MapiResultSet *result;
 
        if (hdl->mid->protocol == prot10 || hdl->mid->protocol == 
prot10compressed) {
+#ifdef PROT10_DEBUG
+               char *initbuf;
+#endif
                char* buf;
 
                result = hdl->result;
-               result->rows_read++;
+               // check if we have read the entire result set
                if (result->rows_read >= result->row_count) {
                        hdl->mid->active = NULL;
                        hdl->active = NULL;
+                       bs2_resetbuf(hdl->mid->from);
                        return 0;
                }
-               // do we have any rows in our cache
-               if (result->rows_read > result->tuple_count) {
-                       // read block from socket
+               // if not, check if our cache is empty
+               if (result->rows_read >= result->tuple_count) {
+                       // if our cache is empty, we read data from the socket
                        lng nrows = 0;
+                       // first we write a prompt to the server indicating 
that we want another block of the result set
                        if (!mnstr_writeChr(hdl->mid->to, 42) || 
mnstr_flush(hdl->mid->to)) {
-                               // FIXME: set hdl->mid to something
-
+                               hdl->mid->errorstr = strdup("Failed to write 
confirm message to server.");
+                               hdl->mid->error = 0;
+                               fprintf(stderr, "Failure 2.\n");
                                return hdl->mid->error;
                        }
+
                        bs2_resetbuf(hdl->mid->from); // kinda a bit evil
-                       // this actually triggers the read of the entire block 
from now we operate on buffer
+                       assert(bs2_buffer(hdl->mid->from).pos == 0);
+
+                       // this actually triggers the read of the entire block
+                       // after this point we operate on the buffer
                        if (!mnstr_readLng(hdl->mid->from, &nrows)) {
                                // FIXME: set hdl->mid to something
+                               hdl->mid->errorstr = strdup("Failed to read row 
response");
+                               hdl->mid->error = 0;
+                               fprintf(stderr, "Failure 3.\n");
                                return hdl->mid->error;
                        }
-                       bs2_resetbuf(hdl->mid->from);
-
-//                     fprintf(stderr, "nrows=%llu\n", nrows);
+
+
+                       //bs2_resetbuf(hdl->mid->from);
+
+#ifdef PROT10_DEBUG
+                       fprintf(stderr, "Read block: %llu - %llu (out of %lld, 
nrow=%lld)\n", result->rows_read, result->rows_read + nrows, result->row_count, 
nrows);
+                       initbuf = (char*) bs2_getbuf(hdl->mid->from);
+#endif
+
                        buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
 
                        // iterate over cols
                        for (i = 0; i < (size_t) result->fieldcnt; i++) {
+#ifdef PROT10_DEBUG
+                               fprintf(stderr, "Column %zu\n", i);
+#endif
                                result->fields[i].buffer_ptr = buf;
                                if (result->fields[i].columnlength < 0) {
                                        // variable-length column
                                        lng col_len = *((lng*) buf);
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read lng %lld from 
position %zu\n", col_len, buf - initbuf);
+#endif
                                        assert((size_t) col_len < 
hdl->mid->blocksize && col_len > 0);
                                        result->fields[i].buffer_ptr += 
sizeof(lng);
                                        buf += col_len + sizeof(lng);
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read strings from 
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
                                } else {
                                        buf += nrows * 
result->fields[i].columnlength;
+#ifdef PROT10_DEBUG
+                                       fprintf(stderr, "Read elements from 
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
                                }
                        }
+                       assert(result->fields[result->fieldcnt - 1].buffer_ptr 
- result->fields[0].buffer_ptr < hdl->mid->blocksize);
                        result->tuple_count += nrows;
                } else {
                        for (i = 0; i < (size_t) result->fieldcnt; i++) {
@@ -5507,6 +5539,7 @@ mapi_fetch_row(MapiHdl hdl)
                                }
                        }
                }
+               result->rows_read++;
                return result->fieldcnt;
        }
 
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4387,7 +4387,6 @@ bs2_getbuf(stream *ss)
        return (void*) s->buf;
 }
 
-
 void
 bs2_resetbuf(stream *ss)
 {
@@ -4398,6 +4397,17 @@ bs2_resetbuf(stream *ss)
        s->readpos = 0;
 }
 
+buffer 
+bs2_buffer(stream *ss) {
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       buffer b;
+       assert(ss->read == bs2_read);
+       b.buf = s->buf;
+       b.pos = s->nr;
+       b.len = s->bufsiz;
+       return b;
+}
+
 int
 isa_block_stream(stream *s)
 {
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -246,6 +246,8 @@ typedef enum {
 stream_export stream *block_stream2(stream *s, size_t bufsiz, 
compression_method comp);
 stream_export void* bs2_getbuf(stream *ss);
 stream_export void bs2_resetbuf(stream *ss);
+stream_export buffer bs2_buffer(stream *s);
+
 
 /* read block of data including the end of block marker */
 stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize, 
size_t cnt);
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -1868,6 +1868,7 @@ static int mvc_export_resultset_prot10(r
        size_t i;
        size_t row = 0;
        size_t srow = 0;
+       size_t varsized = 0;
        lng *var_col_len;
        BATiter *iterators;
        lng fixed_lengths = 0;
@@ -1879,8 +1880,6 @@ static int mvc_export_resultset_prot10(r
                return -1;
        }
 
-       (void) bsize;
-
        if (t->order) {
                order = BBPquickdesc(t->order, FALSE);
                if (!order) {
@@ -1904,8 +1903,9 @@ static int mvc_export_resultset_prot10(r
                if (strcasecmp(c->type.type->sqlname, "decimal") == 0) {
                        str res = MAL_SUCCEED;
                int bat_type = ATOMstorage(iterators[i].b->ttype);
-               int hpos = c->type.scale; //this value isn't right, it's always 
3. todo: find the right scale value (i.e. where the decimal point is)
+               int hpos = c->type.scale;
                bat result = 0;
+
                //decimal values can be stored in various numeric fields, so 
check the numeric field and convert the one it's actually stored in
                switch(bat_type)
                {
@@ -1939,7 +1939,10 @@ static int mvc_export_resultset_prot10(r
                }
 
                if (ATOMvarsized(mtype)) {
+                       // FIXME support other types than string
+                       assert(mtype == TYPE_str);
                        typelen = -1;
+                       varsized++;
                } else {
                        fixed_lengths += typelen;
                }
@@ -1955,58 +1958,102 @@ static int mvc_export_resultset_prot10(r
 
        while (row < (size_t) count)    {
                size_t crow = 0;
-               size_t bytes_left = bsize;
+               size_t bytes_left = bsize - sizeof(lng);
                char cont_req, dummy;
-               for (i = 0; i < (size_t) t->nr_cols; i++) {
-                       var_col_len[i] = 0;
+#ifdef PROT10_DEBUG
+               size_t bufpos;
+#endif
+
+               if (varsized == 0) {
+                       // no varsized elements, so we can immediately compute 
the amount of elements
+                       row = srow + bytes_left / fixed_lengths;
+               } else {
+                       // every varsized member has an 8-byte header 
indicating the length of the header in the block
+                       // subtract this from the amount of bytes left
+                       bytes_left -= varsized * sizeof(lng); 
+
+                       for (i = 0; i < (size_t) t->nr_cols; i++) {
+                               var_col_len[i] = 0;
+                       }
+
+                       // we have varsized elements, so we have to loop to 
determine how many rows fit into a buffer
+                       while (row < (size_t) count) {
+                               size_t rowsize = fixed_lengths;
+                               for (i = 0; i < (size_t) t->nr_cols; i++) {
+                                       res_col *c = t->cols + i;
+                                       int mtype = c->type.type->localtype;
+                                       if (ATOMvarsized(mtype)) {
+                                               size_t slen = strlen((const 
char*) BUNtail(iterators[i], row)) + 1;
+                                               rowsize += slen;
+                                               var_col_len[i] += slen;
+                                       }
+                               }
+                               if (bytes_left < rowsize) {
+                                       // since we are breaking, we have to 
adjust var_col_len and subtract the current string so it is accurate again
+                                       for (i = 0; i < (size_t) t->nr_cols; 
i++) {
+                                               res_col *c = t->cols + i;
+                                               int mtype = 
c->type.type->localtype;
+                                               if (ATOMvarsized(mtype)) {
+                                                       size_t slen = 
strlen((const char*) BUNtail(iterators[i], row)) + 1;
+                                                       var_col_len[i] -= slen;
+                                               }
+                                       }
+                                       break;
+                               }
+                               bytes_left -= rowsize;
+                               row++;
+                       }
+                       assert(row > srow);
                }
 
-               // FIXME: this can be skipped if there are no variable-length 
types in the result set
-               while (row < (size_t) count) {
-                       size_t rowsize = fixed_lengths;
-                       for (i = 0; i < (size_t) t->nr_cols; i++) {
-                               res_col *c = t->cols + i;
-                               int mtype = c->type.type->localtype;
-                               if (ATOMvarsized(mtype)) {
-                                       // FIXME support other types than string
-                                       size_t slen = strlen((const char*) 
BUNtail(iterators[i], row)) + 1;
-                                       assert(mtype == TYPE_str);
-                                       rowsize += slen;
-                                       var_col_len[i] += slen;
-                               }
-                       }
-                       if (bytes_left < rowsize) break;
-                       bytes_left -= rowsize;
-                       row++;
-               }
-               assert(row > 0);
-
                if (!mnstr_readChr(c, &cont_req)) {
+                       fprintf(stderr, "Received cancellation message.\n");
                        return -1;
                }
+
                // consume flush from client
                mnstr_readChr(c, &dummy);
 
                if (cont_req != 42) {
+                       // received cancellation message, stop writing result
+                       fprintf(stderr, "Received cancellation message.\n");
                        break;
                }
 
+#ifdef PROT10_DEBUG
+               fprintf(stderr, "Write block: %zu - %zu (out of %lld, 
nrow=%lld)\n", srow, row, count, (lng)(row - srow));
+               bufpos = sizeof(lng);
+#endif
 
-               if (!mnstr_writeLng(s, (lng) row)) {
+               assert(bs2_buffer(s).pos == 0);
+
+               if (!mnstr_writeLng(s, (lng)(row - srow))) {
                        return -1;
                }
 
                for (i = 0; i < (size_t) t->nr_cols; i++) {
                        res_col *c = t->cols + i;
                        int mtype = c->type.type->localtype;
+#ifdef PROT10_DEBUG
+                       fprintf(stderr, "Column %d\n", i);
+#endif
                        if (ATOMvarsized(mtype)) {
                                // FIXME support other types than string
                                assert(mtype == TYPE_str);
                                assert((size_t) var_col_len[i] < bsize);
+
+#ifdef PROT10_DEBUG
+                               fprintf(stderr, "Write lng %lld to %zu\n", 
var_col_len[i], bufpos);
+                               bufpos += sizeof(lng);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to