Changeset: b469865d631c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b469865d631c
Modified Files:
clients/mapilib/mapi.c
common/stream/stream.c
common/stream/stream.h
sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:
Fixed issues with splitting the resultset into multiple protocol messages.
diffs (truncated from 338 to 300 lines):
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5454,48 +5454,80 @@ mapi_fetch_row(MapiHdl hdl)
struct MapiResultSet *result;
if (hdl->mid->protocol == prot10 || hdl->mid->protocol ==
prot10compressed) {
+#ifdef PROT10_DEBUG
+ char *initbuf;
+#endif
char* buf;
result = hdl->result;
- result->rows_read++;
+ // check if we have read the entire result set
if (result->rows_read >= result->row_count) {
hdl->mid->active = NULL;
hdl->active = NULL;
+ bs2_resetbuf(hdl->mid->from);
return 0;
}
- // do we have any rows in our cache
- if (result->rows_read > result->tuple_count) {
- // read block from socket
+ // if not, check if our cache is empty
+ if (result->rows_read >= result->tuple_count) {
+ // if our cache is empty, we read data from the socket
lng nrows = 0;
+ // first we write a prompt to the server indicating
that we want another block of the result set
if (!mnstr_writeChr(hdl->mid->to, 42) ||
mnstr_flush(hdl->mid->to)) {
- // FIXME: set hdl->mid to something
-
+ hdl->mid->errorstr = strdup("Failed to write
confirm message to server.");
+ hdl->mid->error = 0;
+ fprintf(stderr, "Failure 2.\n");
return hdl->mid->error;
}
+
bs2_resetbuf(hdl->mid->from); // kinda a bit evil
- // this actually triggers the read of the entire block
from now we operate on buffer
+ assert(bs2_buffer(hdl->mid->from).pos == 0);
+
+ // this actually triggers the read of the entire block
+ // after this point we operate on the buffer
if (!mnstr_readLng(hdl->mid->from, &nrows)) {
// FIXME: set hdl->mid to something
+ hdl->mid->errorstr = strdup("Failed to read row
response");
+ hdl->mid->error = 0;
+ fprintf(stderr, "Failure 3.\n");
return hdl->mid->error;
}
- bs2_resetbuf(hdl->mid->from);
-
-// fprintf(stderr, "nrows=%llu\n", nrows);
+
+
+ //bs2_resetbuf(hdl->mid->from);
+
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read block: %llu - %llu (out of %lld,
nrow=%lld)\n", result->rows_read, result->rows_read + nrows, result->row_count,
nrows);
+ initbuf = (char*) bs2_getbuf(hdl->mid->from);
+#endif
+
buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
// iterate over cols
for (i = 0; i < (size_t) result->fieldcnt; i++) {
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Column %zu\n", i);
+#endif
result->fields[i].buffer_ptr = buf;
if (result->fields[i].columnlength < 0) {
// variable-length column
lng col_len = *((lng*) buf);
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read lng %lld from
position %zu\n", col_len, buf - initbuf);
+#endif
assert((size_t) col_len <
hdl->mid->blocksize && col_len > 0);
result->fields[i].buffer_ptr +=
sizeof(lng);
buf += col_len + sizeof(lng);
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read strings from
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
} else {
buf += nrows *
result->fields[i].columnlength;
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read elements from
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
}
}
+ assert(result->fields[result->fieldcnt - 1].buffer_ptr
- result->fields[0].buffer_ptr < hdl->mid->blocksize);
result->tuple_count += nrows;
} else {
for (i = 0; i < (size_t) result->fieldcnt; i++) {
@@ -5507,6 +5539,7 @@ mapi_fetch_row(MapiHdl hdl)
}
}
}
+ result->rows_read++;
return result->fieldcnt;
}
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4387,7 +4387,6 @@ bs2_getbuf(stream *ss)
return (void*) s->buf;
}
-
void
bs2_resetbuf(stream *ss)
{
@@ -4398,6 +4397,17 @@ bs2_resetbuf(stream *ss)
s->readpos = 0;
}
+buffer
+bs2_buffer(stream *ss) {
+ bs2 *s = (bs2 *) ss->stream_data.p;
+ buffer b;
+ assert(ss->read == bs2_read);
+ b.buf = s->buf;
+ b.pos = s->nr;
+ b.len = s->bufsiz;
+ return b;
+}
+
int
isa_block_stream(stream *s)
{
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -246,6 +246,8 @@ typedef enum {
stream_export stream *block_stream2(stream *s, size_t bufsiz,
compression_method comp);
stream_export void* bs2_getbuf(stream *ss);
stream_export void bs2_resetbuf(stream *ss);
+stream_export buffer bs2_buffer(stream *s);
+
/* read block of data including the end of block marker */
stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize,
size_t cnt);
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -1868,6 +1868,7 @@ static int mvc_export_resultset_prot10(r
size_t i;
size_t row = 0;
size_t srow = 0;
+ size_t varsized = 0;
lng *var_col_len;
BATiter *iterators;
lng fixed_lengths = 0;
@@ -1879,8 +1880,6 @@ static int mvc_export_resultset_prot10(r
return -1;
}
- (void) bsize;
-
if (t->order) {
order = BBPquickdesc(t->order, FALSE);
if (!order) {
@@ -1904,8 +1903,9 @@ static int mvc_export_resultset_prot10(r
if (strcasecmp(c->type.type->sqlname, "decimal") == 0) {
str res = MAL_SUCCEED;
int bat_type = ATOMstorage(iterators[i].b->ttype);
- int hpos = c->type.scale; //this value isn't right, it's always
3. todo: find the right scale value (i.e. where the decimal point is)
+ int hpos = c->type.scale;
bat result = 0;
+
//decimal values can be stored in various numeric fields, so
check the numeric field and convert the one it's actually stored in
switch(bat_type)
{
@@ -1939,7 +1939,10 @@ static int mvc_export_resultset_prot10(r
}
if (ATOMvarsized(mtype)) {
+ // FIXME support other types than string
+ assert(mtype == TYPE_str);
typelen = -1;
+ varsized++;
} else {
fixed_lengths += typelen;
}
@@ -1955,58 +1958,102 @@ static int mvc_export_resultset_prot10(r
while (row < (size_t) count) {
size_t crow = 0;
- size_t bytes_left = bsize;
+ size_t bytes_left = bsize - sizeof(lng);
char cont_req, dummy;
- for (i = 0; i < (size_t) t->nr_cols; i++) {
- var_col_len[i] = 0;
+#ifdef PROT10_DEBUG
+ size_t bufpos;
+#endif
+
+ if (varsized == 0) {
+ // no varsized elements, so we can immediately compute
the amount of elements
+ row = srow + bytes_left / fixed_lengths;
+ } else {
+ // every varsized member has an 8-byte header
indicating the length of the header in the block
+ // subtract this from the amount of bytes left
+ bytes_left -= varsized * sizeof(lng);
+
+ for (i = 0; i < (size_t) t->nr_cols; i++) {
+ var_col_len[i] = 0;
+ }
+
+ // we have varsized elements, so we have to loop to
determine how many rows fit into a buffer
+ while (row < (size_t) count) {
+ size_t rowsize = fixed_lengths;
+ for (i = 0; i < (size_t) t->nr_cols; i++) {
+ res_col *c = t->cols + i;
+ int mtype = c->type.type->localtype;
+ if (ATOMvarsized(mtype)) {
+ size_t slen = strlen((const
char*) BUNtail(iterators[i], row)) + 1;
+ rowsize += slen;
+ var_col_len[i] += slen;
+ }
+ }
+ if (bytes_left < rowsize) {
+ // since we are breaking, we have to
adjust var_col_len and subtract the current string so it is accurate again
+ for (i = 0; i < (size_t) t->nr_cols;
i++) {
+ res_col *c = t->cols + i;
+ int mtype =
c->type.type->localtype;
+ if (ATOMvarsized(mtype)) {
+ size_t slen =
strlen((const char*) BUNtail(iterators[i], row)) + 1;
+ var_col_len[i] -= slen;
+ }
+ }
+ break;
+ }
+ bytes_left -= rowsize;
+ row++;
+ }
+ assert(row > srow);
}
- // FIXME: this can be skipped if there are no variable-length
types in the result set
- while (row < (size_t) count) {
- size_t rowsize = fixed_lengths;
- for (i = 0; i < (size_t) t->nr_cols; i++) {
- res_col *c = t->cols + i;
- int mtype = c->type.type->localtype;
- if (ATOMvarsized(mtype)) {
- // FIXME support other types than string
- size_t slen = strlen((const char*)
BUNtail(iterators[i], row)) + 1;
- assert(mtype == TYPE_str);
- rowsize += slen;
- var_col_len[i] += slen;
- }
- }
- if (bytes_left < rowsize) break;
- bytes_left -= rowsize;
- row++;
- }
- assert(row > 0);
-
if (!mnstr_readChr(c, &cont_req)) {
+ fprintf(stderr, "Received cancellation message.\n");
return -1;
}
+
// consume flush from client
mnstr_readChr(c, &dummy);
if (cont_req != 42) {
+ // received cancellation message, stop writing result
+ fprintf(stderr, "Received cancellation message.\n");
break;
}
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Write block: %zu - %zu (out of %lld,
nrow=%lld)\n", srow, row, count, (lng)(row - srow));
+ bufpos = sizeof(lng);
+#endif
- if (!mnstr_writeLng(s, (lng) row)) {
+ assert(bs2_buffer(s).pos == 0);
+
+ if (!mnstr_writeLng(s, (lng)(row - srow))) {
return -1;
}
for (i = 0; i < (size_t) t->nr_cols; i++) {
res_col *c = t->cols + i;
int mtype = c->type.type->localtype;
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Column %d\n", i);
+#endif
if (ATOMvarsized(mtype)) {
// FIXME support other types than string
assert(mtype == TYPE_str);
assert((size_t) var_col_len[i] < bsize);
+
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Write lng %lld to %zu\n",
var_col_len[i], bufpos);
+ bufpos += sizeof(lng);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list