Changeset: 7b55d07971fb for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7b55d07971fb
Modified Files:
clients/Tests/exports.stable.out
common/stream/bs.c
common/stream/socket_stream.c
common/stream/stream.c
common/stream/stream.h
common/stream/stream_internal.h
Branch: Dec2025
Log Message:
Buffer reads when using block streams.
This also fixes interrupt propagation from client to server when using
UNIX domain sockets.
diffs (truncated from 390 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1746,14 +1746,14 @@ int mnstr_fsetpos(stream *restrict s, fp
int mnstr_fsync(stream *s);
buffer *mnstr_get_buffer(stream *s);
bool mnstr_get_swapbytes(const stream *s);
-int mnstr_getoob(const stream *s);
+int mnstr_getoob(stream *s);
int mnstr_init(void);
int mnstr_isalive(const stream *s);
bool mnstr_isbinary(const stream *s);
const char *mnstr_name(const stream *s);
const char *mnstr_peek_error(const stream *s);
int mnstr_printf(stream *restrict s, _In_z_ _Printf_format_string_ const char
*restrict format, ...) __attribute__((__format__(__printf__, 2, 3)));
-int mnstr_putoob(const stream *s, char val);
+int mnstr_putoob(stream *s, char val);
ssize_t mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize,
size_t cnt);
int mnstr_readBte(stream *restrict s, int8_t *restrict val);
int mnstr_readBteArray(stream *restrict s, int8_t *restrict val, size_t cnt);
diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -154,6 +154,42 @@ bs_flush(stream *ss, mnstr_flush_level f
return 0;
}
+static int
+bs_putoob(stream *ss, char val)
+{
+ bs *s = (bs *) ss->stream_data.p;
+ if (s == NULL || ss->inner->putoob == NULL)
+ return -1;
+ if (!ss->readonly && s->nr > 0) {
+ uint16_t blksize = (uint16_t) (s->nr << 1);
+ s->bytes += s->nr;
+ if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
+ ss->inner->write(ss->inner, s->buf, 1, s->nr) !=
(ssize_t) s->nr) {
+ mnstr_copy_error(ss, ss->inner);
+ s->nr = 0; /* data is lost due to error */
+ return -1;
+ }
+ s->blks++;
+ s->nr = 0;
+ }
+ return ss->inner->putoob(ss->inner, val);
+}
+
+static int
+bs_getoob(stream *ss)
+{
+ bs *s = (bs *) ss->stream_data.p;
+ if (s == NULL || ss->inner->getoob == NULL)
+ return 0;
+ if (s->seenoob) {
+ s->seenoob = false;
+ return s->oobval;
+ }
+ if (ss->readonly && s->itotal == 0)
+ return ss->inner->getoob(ss->inner);
+ return 0;
+}
+
/* Read buffered data and return the number of items read. At the
* flush boundary we will return 0 to indicate the end of a block,
* unless prompt and pstream are set. In that case, only return 0
@@ -170,33 +206,33 @@ bs_flush(stream *ss, mnstr_flush_level f
ssize_t
bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
{
- bs *s;
+ bs *s = (bs *) ss->stream_data.p;
size_t todo = cnt * elmsize;
size_t n;
- s = (bs *) ss->stream_data.p;
+ assert(ss->readonly);
+ if (!ss->readonly)
+ return -1;
if (s == NULL)
return -1;
- assert(ss->readonly);
- assert(s->nr <= 1);
- if (s->itotal == 0) {
- int16_t blksize = 0;
-
- if (s->nr) {
- /* We read the closing block but hadn't
- * returned that yet. Return it now, and note
- * that we did by setting s->nr to 0. */
- assert(s->nr == 1);
- s->nr = 0;
- return 0;
- }
-
- assert(s->nr == 0);
-
- /* There is nothing more to read in the current block,
- * so read the count for the next block */
- switch (mnstr_readSht(ss->inner, &blksize)) {
+ cnt = 0; /* #bytes copied to
caller */
+ if (s->itotal > 0) {
+ n = s->itotal - s->nr; /* amount readable in current block */
+ if (n > todo)
+ n = todo;
+ memcpy(buf, s->buf + s->nr, n);
+ buf = (void *) ((char *) buf + n);
+ s->nr += n;
+ todo -= n;
+ cnt += n;
+ if (s->nr == s->itotal)
+ s->itotal = s->nr = 0;
+ }
+ while (todo > 0 && !s->seenflush) {
+ uint16_t blksize;
+ assert(s->itotal == 0);
+ switch (mnstr_readSht(ss->inner, (int16_t *) &blksize)) {
case -1:
mnstr_copy_error(ss, ss->inner);
return -1;
@@ -206,101 +242,46 @@ bs_read(stream *restrict ss, void *restr
case 1:
break;
}
- if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
- mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block
size %d", blksize);
+ if (blksize == 0xFFFF) {
+ /* client interrupt */
+ s->seenoob = true;
+ ss->inner->read(ss->inner, &s->oobval, 1, 1);
+ return 0;
+ }
+ if (blksize > (BLOCK << 1 | 1)) {
+ mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block
size %"PRIu16,
+ blksize);
return -1;
}
-#ifdef BSTREAM_DEBUG
- fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize
>> 1, (uint16_t) blksize & 1 ? "true" : "false");
- fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
-#endif
- s->itotal = (uint16_t) blksize >> 1; /* amount readable */
- /* store whether this was the last block or not */
- s->nr = (uint16_t) blksize & 1;
- s->bytes += s->itotal;
- s->blks++;
- }
-
- /* Fill the caller's buffer. */
- cnt = 0; /* count how much we put into the buffer */
- while (todo > 0) {
- /* there is more data waiting in the current block, so
- * read it */
- n = todo < s->itotal ? todo : s->itotal;
- while (n > 0) {
- ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
-
- if (m <= 0) {
- ss->eof |= ss->inner->eof;
+ assert(s->nr == 0);
+ s->seenflush = blksize & 1;
+ s->itotal = blksize >> 1;
+ ssize_t m;
+ n = 0;
+ do {
+ m = ss->inner->read(ss->inner, s->buf + n, 1, s->itotal
- n);
+ if (m < 0) {
mnstr_copy_error(ss, ss->inner);
return -1;
}
-#ifdef BSTREAM_DEBUG
- {
- ssize_t i;
-
- fprintf(stderr, "RD %s %zd \"", ss->name, m);
- for (i = 0; i < m; i++)
- if (' ' <= ((char *) buf)[i] &&
- ((char *) buf)[i] < 127)
- putc(((char *) buf)[i], stderr);
- else
- fprintf(stderr, "\\%03o",
((unsigned char *) buf)[i]);
- fprintf(stderr, "\"\n");
- }
-#endif
- buf = (void *) ((char *) buf + m);
- cnt += (size_t) m;
- n -= (size_t) m;
- s->itotal -= (unsigned) m;
- todo -= (size_t) m;
- }
-
- if (s->itotal == 0) {
- int16_t blksize = 0;
-
- /* The current block has been completely read,
- * so read the count for the next block, only
- * if the previous was not the last one */
- if (s->nr)
- break;
- switch (mnstr_readSht(ss->inner, &blksize)) {
- case -1:
- mnstr_copy_error(ss, ss->inner);
- return -1;
- case 0:
- ss->eof |= ss->inner->eof;
- return 0;
- case 1:
- break;
- }
- if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
- mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid
block size %d", blksize);
- return -1;
- }
-#ifdef BSTREAM_DEBUG
- fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t)
blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
- fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
- fprintf(stderr, "RC %s %d\n", ss->name, blksize);
-#endif
- s->itotal = (uint16_t) blksize >> 1; /* amount
readable */
- /* store whether this was the last block or not */
- s->nr = (uint16_t) blksize & 1;
- s->bytes += s->itotal;
- s->blks++;
- }
+ n += (size_t) m;
+ } while (n < s->itotal);
+ if (n > todo)
+ n = todo;
+ memcpy(buf, s->buf, n);
+ buf = (void *) ((char *) buf + n);
+ s->nr += n;
+ todo -= n;
+ cnt += n;
+ if (s->nr == s->itotal)
+ s->itotal = s->nr = 0;
}
- /* if we got an empty block with the end-of-sequence marker
- * set (low-order bit) we must only return an empty read once,
- * so we must squash the flag that we still have to return an
- * empty read */
- if (todo > 0 && cnt == 0)
- s->nr = 0;
- return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
+ if (s->seenflush && cnt == 0) {
+ s->seenflush = false; /* ready for next time */
+ }
+ return elmsize == 0 ? 0 : (ssize_t) (cnt / elmsize);
}
-
-
static void
bs_close(stream *ss)
{
@@ -373,6 +354,10 @@ block_stream(stream *s)
ns->close = bs_close;
ns->destroy = bs_destroy;
ns->stream_data.p = (void *) b;
+ if (ns->putoob)
+ ns->putoob = bs_putoob;
+ if (ns->getoob)
+ ns->getoob = bs_getoob;
return ns;
}
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -27,7 +27,7 @@
/* streams working on a socket */
static int
-socket_getoob(const stream *s)
+socket_getoob(stream *s)
{
SOCKET fd = s->stream_data.s;
#ifdef HAVE_POLL
@@ -108,7 +108,7 @@ socket_getoob(const stream *s)
}
static int
-socket_putoob(const stream *s, char val)
+socket_putoob(stream *s, char val)
{
SOCKET fd = s->stream_data.s;
if (send(fd, &val, 1, MSG_OOB) == -1) {
@@ -125,7 +125,7 @@ socket_putoob(const stream *s, char val)
#define OOBMSG1 '\377'
static int
-socket_getoob_unix(const stream *s)
+socket_getoob_unix(stream *s)
{
SOCKET fd = s->stream_data.s;
#ifdef HAVE_POLL
@@ -170,7 +170,7 @@ socket_getoob_unix(const stream *s)
}
static int
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]