Changeset: 7b55d07971fb for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7b55d07971fb
Modified Files:
        clients/Tests/exports.stable.out
        common/stream/bs.c
        common/stream/socket_stream.c
        common/stream/stream.c
        common/stream/stream.h
        common/stream/stream_internal.h
Branch: Dec2025
Log Message:

Buffer reads when using block streams.
This also fixes interrupt propagation from client to server when using
UNIX domain sockets.


diffs (truncated from 390 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1746,14 +1746,14 @@ int mnstr_fsetpos(stream *restrict s, fp
 int mnstr_fsync(stream *s);
 buffer *mnstr_get_buffer(stream *s);
 bool mnstr_get_swapbytes(const stream *s);
-int mnstr_getoob(const stream *s);
+int mnstr_getoob(stream *s);
 int mnstr_init(void);
 int mnstr_isalive(const stream *s);
 bool mnstr_isbinary(const stream *s);
 const char *mnstr_name(const stream *s);
 const char *mnstr_peek_error(const stream *s);
 int mnstr_printf(stream *restrict s, _In_z_ _Printf_format_string_ const char 
*restrict format, ...) __attribute__((__format__(__printf__, 2, 3)));
-int mnstr_putoob(const stream *s, char val);
+int mnstr_putoob(stream *s, char val);
 ssize_t mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, 
size_t cnt);
 int mnstr_readBte(stream *restrict s, int8_t *restrict val);
 int mnstr_readBteArray(stream *restrict s, int8_t *restrict val, size_t cnt);
diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -154,6 +154,42 @@ bs_flush(stream *ss, mnstr_flush_level f
        return 0;
 }
 
+static int
+bs_putoob(stream *ss, char val)
+{
+       bs *s = (bs *) ss->stream_data.p;
+       if (s == NULL || ss->inner->putoob == NULL)
+               return -1;
+       if (!ss->readonly && s->nr > 0) {
+               uint16_t blksize = (uint16_t) (s->nr << 1);
+               s->bytes += s->nr;
+               if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
+                       ss->inner->write(ss->inner, s->buf, 1, s->nr) != 
(ssize_t) s->nr) {
+                       mnstr_copy_error(ss, ss->inner);
+                       s->nr = 0; /* data is lost due to error */
+                       return -1;
+               }
+               s->blks++;
+               s->nr = 0;
+       }
+       return ss->inner->putoob(ss->inner, val);
+}
+
+static int
+bs_getoob(stream *ss)
+{
+       bs *s = (bs *) ss->stream_data.p;
+       if (s == NULL || ss->inner->getoob == NULL)
+               return 0;
+       if (s->seenoob) {
+               s->seenoob = false;
+               return s->oobval;
+       }
+       if (ss->readonly && s->itotal == 0)
+               return ss->inner->getoob(ss->inner);
+       return 0;
+}
+
 /* Read buffered data and return the number of items read.  At the
  * flush boundary we will return 0 to indicate the end of a block,
  * unless prompt and pstream are set. In that case, only return 0
@@ -170,33 +206,33 @@ bs_flush(stream *ss, mnstr_flush_level f
 ssize_t
 bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
 {
-       bs *s;
+       bs *s = (bs *) ss->stream_data.p;
        size_t todo = cnt * elmsize;
        size_t n;
 
-       s = (bs *) ss->stream_data.p;
+       assert(ss->readonly);
+       if (!ss->readonly)
+               return -1;
        if (s == NULL)
                return -1;
-       assert(ss->readonly);
-       assert(s->nr <= 1);
 
-       if (s->itotal == 0) {
-               int16_t blksize = 0;
-
-               if (s->nr) {
-                       /* We read the closing block but hadn't
-                        * returned that yet. Return it now, and note
-                        * that we did by setting s->nr to 0. */
-                       assert(s->nr == 1);
-                       s->nr = 0;
-                       return 0;
-               }
-
-               assert(s->nr == 0);
-
-               /* There is nothing more to read in the current block,
-                * so read the count for the next block */
-               switch (mnstr_readSht(ss->inner, &blksize)) {
+       cnt = 0;                                        /* #bytes copied to 
caller */
+       if (s->itotal > 0) {
+               n = s->itotal - s->nr;  /* amount readable in current block */
+               if (n > todo)
+                       n = todo;
+               memcpy(buf, s->buf + s->nr, n);
+               buf = (void *) ((char *) buf + n);
+               s->nr += n;
+               todo -= n;
+               cnt += n;
+               if (s->nr == s->itotal)
+                       s->itotal = s->nr = 0;
+       }
+       while (todo > 0 && !s->seenflush) {
+               uint16_t blksize;
+               assert(s->itotal == 0);
+               switch (mnstr_readSht(ss->inner, (int16_t *) &blksize)) {
                case -1:
                        mnstr_copy_error(ss, ss->inner);
                        return -1;
@@ -206,101 +242,46 @@ bs_read(stream *restrict ss, void *restr
                case 1:
                        break;
                }
-               if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
-                       mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block 
size %d", blksize);
+               if (blksize == 0xFFFF) {
+                       /* client interrupt */
+                       s->seenoob = true;
+                       ss->inner->read(ss->inner, &s->oobval, 1, 1);
+                       return 0;
+               }
+               if (blksize > (BLOCK << 1 | 1)) {
+                       mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block 
size %"PRIu16,
+                                                       blksize);
                        return -1;
                }
-#ifdef BSTREAM_DEBUG
-               fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize 
>> 1, (uint16_t) blksize & 1 ? "true" : "false");
-               fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
-#endif
-               s->itotal = (uint16_t) blksize >> 1;    /* amount readable */
-               /* store whether this was the last block or not */
-               s->nr = (uint16_t) blksize & 1;
-               s->bytes += s->itotal;
-               s->blks++;
-       }
-
-       /* Fill the caller's buffer. */
-       cnt = 0;                /* count how much we put into the buffer */
-       while (todo > 0) {
-               /* there is more data waiting in the current block, so
-                * read it */
-               n = todo < s->itotal ? todo : s->itotal;
-               while (n > 0) {
-                       ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
-
-                       if (m <= 0) {
-                               ss->eof |= ss->inner->eof;
+               assert(s->nr == 0);
+               s->seenflush = blksize & 1;
+               s->itotal = blksize >> 1;
+               ssize_t m;
+               n = 0;
+               do {
+                       m = ss->inner->read(ss->inner, s->buf + n, 1, s->itotal 
- n);
+                       if (m < 0) {
                                mnstr_copy_error(ss, ss->inner);
                                return -1;
                        }
-#ifdef BSTREAM_DEBUG
-                       {
-                               ssize_t i;
-
-                               fprintf(stderr, "RD %s %zd \"", ss->name, m);
-                               for (i = 0; i < m; i++)
-                                       if (' ' <= ((char *) buf)[i] &&
-                                           ((char *) buf)[i] < 127)
-                                               putc(((char *) buf)[i], stderr);
-                                       else
-                                               fprintf(stderr, "\\%03o", 
((unsigned char *) buf)[i]);
-                               fprintf(stderr, "\"\n");
-                       }
-#endif
-                       buf = (void *) ((char *) buf + m);
-                       cnt += (size_t) m;
-                       n -= (size_t) m;
-                       s->itotal -= (unsigned) m;
-                       todo -= (size_t) m;
-               }
-
-               if (s->itotal == 0) {
-                       int16_t blksize = 0;
-
-                       /* The current block has been completely read,
-                        * so read the count for the next block, only
-                        * if the previous was not the last one */
-                       if (s->nr)
-                               break;
-                       switch (mnstr_readSht(ss->inner, &blksize)) {
-                       case -1:
-                               mnstr_copy_error(ss, ss->inner);
-                               return -1;
-                       case 0:
-                               ss->eof |= ss->inner->eof;
-                               return 0;
-                       case 1:
-                               break;
-                       }
-                       if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
-                               mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid 
block size %d", blksize);
-                               return -1;
-                       }
-#ifdef BSTREAM_DEBUG
-                       fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) 
blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
-                       fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
-                       fprintf(stderr, "RC %s %d\n", ss->name, blksize);
-#endif
-                       s->itotal = (uint16_t) blksize >> 1;    /* amount 
readable */
-                       /* store whether this was the last block or not */
-                       s->nr = (uint16_t) blksize & 1;
-                       s->bytes += s->itotal;
-                       s->blks++;
-               }
+                       n += (size_t) m;
+               } while (n < s->itotal);
+               if (n > todo)
+                       n = todo;
+               memcpy(buf, s->buf, n);
+               buf = (void *) ((char *) buf + n);
+               s->nr += n;
+               todo -= n;
+               cnt += n;
+               if (s->nr == s->itotal)
+                       s->itotal = s->nr = 0;
        }
-       /* if we got an empty block with the end-of-sequence marker
-        * set (low-order bit) we must only return an empty read once,
-        * so we must squash the flag that we still have to return an
-        * empty read */
-       if (todo > 0 && cnt == 0)
-               s->nr = 0;
-       return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
+       if (s->seenflush && cnt == 0) {
+               s->seenflush = false;   /* ready for next time */
+       }
+       return elmsize == 0 ? 0 : (ssize_t) (cnt / elmsize);
 }
 
-
-
 static void
 bs_close(stream *ss)
 {
@@ -373,6 +354,10 @@ block_stream(stream *s)
        ns->close = bs_close;
        ns->destroy = bs_destroy;
        ns->stream_data.p = (void *) b;
+       if (ns->putoob)
+               ns->putoob = bs_putoob;
+       if (ns->getoob)
+               ns->getoob = bs_getoob;
 
        return ns;
 }
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -27,7 +27,7 @@
 /* streams working on a socket */
 
 static int
-socket_getoob(const stream *s)
+socket_getoob(stream *s)
 {
        SOCKET fd = s->stream_data.s;
 #ifdef HAVE_POLL
@@ -108,7 +108,7 @@ socket_getoob(const stream *s)
 }
 
 static int
-socket_putoob(const stream *s, char val)
+socket_putoob(stream *s, char val)
 {
        SOCKET fd = s->stream_data.s;
        if (send(fd, &val, 1, MSG_OOB) == -1) {
@@ -125,7 +125,7 @@ socket_putoob(const stream *s, char val)
 #define OOBMSG1        '\377'
 
 static int
-socket_getoob_unix(const stream *s)
+socket_getoob_unix(stream *s)
 {
        SOCKET fd = s->stream_data.s;
 #ifdef HAVE_POLL
@@ -170,7 +170,7 @@ socket_getoob_unix(const stream *s)
 }
 
 static int
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to