Changeset: 7b48ad68e415 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7b48ad68e415
Modified Files:
        common/stream/bs.c
        common/stream/socket_stream.c
        common/stream/stream_internal.h
Branch: Mar2025
Log Message:

Serialize reading OOB info from stream.


diffs (113 lines):

diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -30,6 +30,11 @@ bs_create(void)
        if ((ns = malloc(sizeof(*ns))) == NULL)
                return NULL;
        *ns = (bs) {0};
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+       InitializeCriticalSection(&ns->lock);
+#else
+       pthread_mutex_init(&ns->lock, NULL);
+#endif
        return ns;
 }
 
@@ -178,16 +183,28 @@ bs_putoob(stream *ss, char val)
 static int
 bs_getoob(stream *ss)
 {
+       int oobval;
        bs *s = (bs *) ss->stream_data.p;
        if (s == NULL || ss->inner->getoob == NULL)
                return 0;
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+       EnterCriticalSection(&s->lock);
+#else
+       pthread_mutex_lock(&s->lock);
+#endif
        if (s->seenoob) {
                s->seenoob = false;
-               return s->oobval;
-       }
-       if (ss->readonly && s->itotal == 0)
-               return ss->inner->getoob(ss->inner);
-       return 0;
+               oobval = s->oobval;
+       } else if (ss->readonly && s->itotal == 0)
+               oobval = ss->inner->getoob(ss->inner);
+       else
+               oobval = 0;
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+       LeaveCriticalSection(&s->lock);
+#else
+       pthread_mutex_unlock(&s->lock);
+#endif
+       return oobval;
 }
 
 /* Read buffered data and return the number of items read.  At the
@@ -306,6 +323,11 @@ bs_destroy(stream *ss)
        if (s) {
                if (ss->inner)
                        ss->inner->destroy(ss->inner);
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+               DeleteCriticalSection(&s->lock);
+#else
+               pthread_mutex_destroy(&s->lock);
+#endif
                free(s);
        }
        destroy_stream(ss);
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -92,7 +92,7 @@ socket_getoob(stream *s)
                        }
                }
 #endif
-               char b = 0;
+               unsigned char b = 0;
                switch (recv(fd, &b, 1, MSG_OOB)) {
                case 0:
                        /* unexpectedly didn't receive a byte */
@@ -163,7 +163,7 @@ socket_getoob_unix(stream *s)
                        if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) {
                                nr = recv(fd, buf, 3, 0);
                                if (nr == 3)
-                                       return buf[2];
+                                       return (unsigned char) buf[2];
                        }
                }
        return 0;
diff --git a/common/stream/stream_internal.h b/common/stream/stream_internal.h
--- a/common/stream/stream_internal.h
+++ b/common/stream/stream_internal.h
@@ -62,6 +62,9 @@
 #include <lz4.h>
 #include <lz4frame.h>
 #endif
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
 
 #ifndef SHUT_RD
 #define SHUT_RD                0
@@ -259,11 +262,16 @@ stream *open_lz4wastream(const char *res
  * bs2.c should be dropped.*/
 typedef struct bs bs;
 struct bs {
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+       CRITICAL_SECTION lock;
+#else
+       pthread_mutex_t lock;
+#endif
        uint16_t nr;            /* how far we got in buf */
        uint16_t itotal;        /* amount available in current read block */
        bool seenflush;
        bool seenoob;
-       char oobval;
+       unsigned char oobval;
        int64_t blks;           /* read/written blocks (possibly partial) */
        int64_t bytes;          /* read/written bytes */
        char buf[BLOCK];        /* the buffered data (minus the size of
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to