Changeset: 7b48ad68e415 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7b48ad68e415
Modified Files:
common/stream/bs.c
common/stream/socket_stream.c
common/stream/stream_internal.h
Branch: Mar2025
Log Message:
Serialize reading OOB info from stream.
diffs (113 lines):
diff --git a/common/stream/bs.c b/common/stream/bs.c
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -30,6 +30,11 @@ bs_create(void)
if ((ns = malloc(sizeof(*ns))) == NULL)
return NULL;
*ns = (bs) {0};
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+ InitializeCriticalSection(&ns->lock);
+#else
+ pthread_mutex_init(&ns->lock, NULL);
+#endif
return ns;
}
@@ -178,16 +183,28 @@ bs_putoob(stream *ss, char val)
static int
bs_getoob(stream *ss)
{
+ int oobval;
bs *s = (bs *) ss->stream_data.p;
if (s == NULL || ss->inner->getoob == NULL)
return 0;
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+ EnterCriticalSection(&s->lock);
+#else
+ pthread_mutex_lock(&s->lock);
+#endif
if (s->seenoob) {
s->seenoob = false;
- return s->oobval;
- }
- if (ss->readonly && s->itotal == 0)
- return ss->inner->getoob(ss->inner);
- return 0;
+ oobval = s->oobval;
+ } else if (ss->readonly && s->itotal == 0)
+ oobval = ss->inner->getoob(ss->inner);
+ else
+ oobval = 0;
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+ LeaveCriticalSection(&s->lock);
+#else
+ pthread_mutex_unlock(&s->lock);
+#endif
+ return oobval;
}
/* Read buffered data and return the number of items read. At the
@@ -306,6 +323,11 @@ bs_destroy(stream *ss)
if (s) {
if (ss->inner)
ss->inner->destroy(ss->inner);
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+ DeleteCriticalSection(&s->lock);
+#else
+ pthread_mutex_destroy(&s->lock);
+#endif
free(s);
}
destroy_stream(ss);
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -92,7 +92,7 @@ socket_getoob(stream *s)
}
}
#endif
- char b = 0;
+ unsigned char b = 0;
switch (recv(fd, &b, 1, MSG_OOB)) {
case 0:
/* unexpectedly didn't receive a byte */
@@ -163,7 +163,7 @@ socket_getoob_unix(stream *s)
if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) {
nr = recv(fd, buf, 3, 0);
if (nr == 3)
- return buf[2];
+ return (unsigned char) buf[2];
}
}
return 0;
diff --git a/common/stream/stream_internal.h b/common/stream/stream_internal.h
--- a/common/stream/stream_internal.h
+++ b/common/stream/stream_internal.h
@@ -62,6 +62,9 @@
#include <lz4.h>
#include <lz4frame.h>
#endif
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
#ifndef SHUT_RD
#define SHUT_RD 0
@@ -259,11 +262,16 @@ stream *open_lz4wastream(const char *res
* bs2.c should be dropped.*/
typedef struct bs bs;
struct bs {
+#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
+ CRITICAL_SECTION lock;
+#else
+ pthread_mutex_t lock;
+#endif
uint16_t nr; /* how far we got in buf */
uint16_t itotal; /* amount available in current read block */
bool seenflush;
bool seenoob;
- char oobval;
+ unsigned char oobval;
int64_t blks; /* read/written blocks (possibly partial) */
int64_t bytes; /* read/written bytes */
char buf[BLOCK]; /* the buffered data (minus the size of
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]