Changeset: 8c72389e014a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8c72389e014a
Modified Files:
clients/mapiclient/mclient.c
clients/mapilib/mapi.c
common/stream/stream.c
common/stream/stream.h
sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:
merge
diffs (truncated from 527 to 300 lines):
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -806,7 +806,7 @@ CSVrenderer(MapiHdl hdl)
char *s;
char *sep = separator;
int i;
- char buffer[100000];
+ char buffer[BUFSIZ];
char *buffer_ptr;
if (csvheader) {
fields = mapi_get_field_count(hdl);
@@ -825,7 +825,7 @@ CSVrenderer(MapiHdl hdl)
s = mapi_fetch_field(hdl, i);
buffer_ptr = stpcpy(buffer_ptr, s);
- if (i != 0) {
+ if (i != fields - 1) {
*buffer_ptr++ = *sep;
}
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5488,48 +5488,80 @@ mapi_fetch_row(MapiHdl hdl)
struct MapiResultSet *result;
if (hdl->mid->protocol == prot10 || hdl->mid->protocol ==
prot10compressed) {
+#ifdef PROT10_DEBUG
+ char *initbuf;
+#endif
char* buf;
result = hdl->result;
- result->rows_read++;
+ // check if we have read the entire result set
if (result->rows_read >= result->row_count) {
hdl->mid->active = NULL;
hdl->active = NULL;
+ bs2_resetbuf(hdl->mid->from);
return 0;
}
- // do we have any rows in our cache
- if (result->rows_read > result->tuple_count) {
- // read block from socket
+ // if not, check if our cache is empty
+ if (result->rows_read >= result->tuple_count) {
+ // if our cache is empty, we read data from the socket
lng nrows = 0;
+ // first we write a prompt to the server indicating
that we want another block of the result set
if (!mnstr_writeChr(hdl->mid->to, 42) ||
mnstr_flush(hdl->mid->to)) {
- // FIXME: set hdl->mid to something
-
+ hdl->mid->errorstr = strdup("Failed to write
confirm message to server.");
+ hdl->mid->error = 0;
+ fprintf(stderr, "Failure 2.\n");
return hdl->mid->error;
}
+
bs2_resetbuf(hdl->mid->from); // kinda a bit evil
- // this actually triggers the read of the entire block
from now we operate on buffer
+ assert(bs2_buffer(hdl->mid->from).pos == 0);
+
+ // this actually triggers the read of the entire block
+ // after this point we operate on the buffer
if (!mnstr_readLng(hdl->mid->from, &nrows)) {
// FIXME: set hdl->mid to something
+ hdl->mid->errorstr = strdup("Failed to read row
response");
+ hdl->mid->error = 0;
+ fprintf(stderr, "Failure 3.\n");
return hdl->mid->error;
}
- bs2_resetbuf(hdl->mid->from);
-
-// fprintf(stderr, "nrows=%llu\n", nrows);
+
+
+ //bs2_resetbuf(hdl->mid->from);
+
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read block: %llu - %llu (out of %lld,
nrow=%lld)\n", result->rows_read, result->rows_read + nrows, result->row_count,
nrows);
+ initbuf = (char*) bs2_getbuf(hdl->mid->from);
+#endif
+
buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
// iterate over cols
for (i = 0; i < (size_t) result->fieldcnt; i++) {
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Column %zu\n", i);
+#endif
result->fields[i].buffer_ptr = buf;
if (result->fields[i].columnlength < 0) {
// variable-length column
lng col_len = *((lng*) buf);
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read lng %lld from
position %zu\n", col_len, buf - initbuf);
+#endif
assert((size_t) col_len <
hdl->mid->blocksize && col_len > 0);
result->fields[i].buffer_ptr +=
sizeof(lng);
buf += col_len + sizeof(lng);
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read strings from
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
} else {
buf += nrows *
result->fields[i].columnlength;
+#ifdef PROT10_DEBUG
+ fprintf(stderr, "Read elements from
position %zu\n", result->fields[i].buffer_ptr - initbuf);
+#endif
}
}
+ assert(result->fields[result->fieldcnt - 1].buffer_ptr
- result->fields[0].buffer_ptr < (long) hdl->mid->blocksize);
result->tuple_count += nrows;
} else {
for (i = 0; i < (size_t) result->fieldcnt; i++) {
@@ -5541,6 +5573,7 @@ mapi_fetch_row(MapiHdl hdl)
}
}
}
+ result->rows_read++;
return result->fieldcnt;
}
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4233,34 +4233,41 @@ bs2_read(stream *ss, void *buf, size_t e
/* store whether this was the last block or not */
s->nr = blksize & 1;
-
- if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
+ if (s->itotal > 0) {
// read everything into the comp buf
size_t uncompressed_length = s->bufsiz;
size_t m = 0;
+ char *buf = s->buf;
+ if (s->comp != COMPRESSION_NONE) {
+ buf = s->compbuf;
+ }
snappy_status ret;
while (m < s->itotal) {
ssize_t bytes_read = 0;
- bytes_read = s->s->read(s->s, s->compbuf + m,
1, s->itotal - m);
+ bytes_read = s->s->read(s->s, buf + m, 1,
s->itotal - m);
if (bytes_read <= 0) {
ss->errnr = s->s->errnr;
return -1;
}
m += bytes_read;
}
- if ((ret = snappy_uncompress(s->compbuf, s->itotal,
s->buf, &uncompressed_length)) != SNAPPY_OK) {
- ss->errnr = (int) ret;
- return -1;
+ if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+ if ((ret = snappy_uncompress(s->compbuf,
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
+ ss->errnr = (int) ret;
+ return -1;
+ }
+#else
+ assert(0);
+ return -1;
+#endif
+ } else {
+ uncompressed_length = m;
}
s->itotal = uncompressed_length;
s->readpos = 0;
}
-#else
- assert(0);
- return 0;
-#endif
}
/* Fill the caller's buffer. */
@@ -4269,45 +4276,14 @@ bs2_read(stream *ss, void *buf, size_t e
/* there is more data waiting in the current block, so
* read it */
n = todo < s->itotal ? todo : s->itotal;
- if (s->comp == COMPRESSION_SNAPPY) {
- memcpy(buf, s->buf + s->readpos, n);
- buf = (void *) ((char *) buf + n);
- cnt += n;
- todo -= n;
- s->readpos += n;
- s->itotal -= n;
-
- } else {
- while (n > 0) {
- ssize_t m = s->s->read(s->s, buf, 1, n);
-
- if (m <= 0) {
- ss->errnr = s->s->errnr;
- return -1;
- }
-
-
-#ifdef BSTREAM_DEBUG
- {
- ssize_t i;
-
- fprintf(stderr, "R2 '%s' %zd \"",
ss->name, m);
- for (i = 0; i < m; i++)
- if (' ' <= ((char *) buf)[i] &&
((char *) buf)[i] < 127)
- putc(((char *) buf)[i],
stderr);
- else
- fprintf(stderr,
"\\%03o", ((char *) buf)[i]);
- fprintf(stderr, "\"\n");
- }
-#endif
-
- buf = (void *) ((char *) buf + m);
- cnt += m;
- n -= m;
- s->itotal -= m;
- todo -= m;
- }
- }
+
+ memcpy(buf, s->buf + s->readpos, n);
+ buf = (void *) ((char *) buf + n);
+ cnt += n;
+ todo -= n;
+ s->readpos += n;
+ s->itotal -= n;
+
if (s->itotal == 0) {
lng blksize = 0;
@@ -4339,33 +4315,41 @@ bs2_read(stream *ss, void *buf, size_t e
/* store whether this was the last block or not */
s->nr = blksize & 1;
- if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
+ if (s->itotal > 0) {
// read everything into the comp buf
size_t uncompressed_length = s->bufsiz;
size_t m = 0;
+ char *buf = s->buf;
+ if (s->comp != COMPRESSION_NONE) {
+ buf = s->compbuf;
+ }
snappy_status ret;
while (m < s->itotal) {
ssize_t bytes_read = 0;
- bytes_read = s->s->read(s->s,
s->compbuf + m, 1, s->itotal - m);
+ bytes_read = s->s->read(s->s, buf + m,
1, s->itotal - m);
if (bytes_read <= 0) {
ss->errnr = s->s->errnr;
return -1;
}
m += bytes_read;
}
- if ((ret = snappy_uncompress(s->compbuf,
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
- ss->errnr = (int) ret;
- return -1;
+ if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+ if ((ret =
snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) !=
SNAPPY_OK) {
+ ss->errnr = (int) ret;
+ return -1;
+ }
+#else
+ assert(0);
+ return -1;
+#endif
+ } else {
+ uncompressed_length = m;
}
s->itotal = uncompressed_length;
s->readpos = 0;
}
-#else
- assert(0);
- return -1;
-#endif
}
}
/* if we got an empty block with the end-of-sequence marker
@@ -4387,7 +4371,6 @@ bs2_getbuf(stream *ss)
return (void*) s->buf;
}
-
void
bs2_resetbuf(stream *ss)
{
@@ -4398,6 +4381,17 @@ bs2_resetbuf(stream *ss)
s->readpos = 0;
}
+buffer
+bs2_buffer(stream *ss) {
+ bs2 *s = (bs2 *) ss->stream_data.p;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list