Changeset: fc2322735185 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fc2322735185
Modified Files:
gdk/gdk.h
gdk/gdk_join.c
Branch: default
Log Message:
Merge with Aug2024 branch.
diffs (truncated from 630 to 300 lines):
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -3730,8 +3730,7 @@ mapi_query_abort(MapiHdl hdl, int reason
mapi_hdl_check(hdl);
mid = hdl->mid;
assert(mid->active == NULL || mid->active == hdl);
- if (mid->oobintr && !hdl->aborted) {
- mnstr_putoob(mid->to, reason);
+ if (mid->oobintr && !hdl->aborted && mnstr_putoob(mid->to, reason) ==
0) {
hdl->aborted = true;
return MOK;
}
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -18,11 +18,160 @@
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
+#ifdef HAVE_SYS_IOCTL_H
+#include <sys/ioctl.h>
+#endif
/* ------------------------------------------------------------------ */
/* streams working on a socket */
+static int
+socket_getoob(const stream *s)
+{
+ SOCKET fd = s->stream_data.s;
+#ifdef HAVE_POLL
+ struct pollfd pfd = (struct pollfd) {
+ .fd = fd,
+ .events = POLLPRI,
+ };
+ if (poll(&pfd, 1, 0) > 0)
+#else
+ fd_set fds;
+ struct timeval t = (struct timeval) {
+ .tv_sec = 0,
+ .tv_usec = 0,
+ };
+#ifdef FD_SETSIZE
+ if (fd >= FD_SETSIZE)
+ return 0;
+#endif
+ FD_ZERO(&fds);
+ FD_SET(fd, &fds);
+ if (select(
+#ifdef _MSC_VER
+ 0, /* ignored on Windows */
+#else
+ fd + 1,
+#endif
+ NULL, NULL, &fds, &t) > 0)
+#endif
+ {
+#ifdef HAVE_POLL
+ if (pfd.revents & (POLLHUP | POLLNVAL))
+ return -1;
+ if ((pfd.revents & POLLPRI) == 0)
+ return -1;
+#else
+ if (!FD_ISSET(fd, &fds))
+ return 0;
+#endif
+ /* discard regular data until OOB mark */
+ for (;;) {
+ int atmark = 0;
+ char flush[100];
+ if (ioctlsocket(fd, SIOCATMARK, &atmark) < 0) {
+ perror("ioctl");
+ break;
+ }
+ if (atmark)
+ break;
+ if (recv(fd, flush, sizeof(flush), 0) < 0) {
+ perror("recv");
+ break;
+ }
+ }
+ char b = 0;
+ switch (recv(fd, &b, 1, MSG_OOB)) {
+ case 0:
+ /* unexpectedly didn't receive a byte */
+ break;
+ case 1:
+ return b;
+ case -1:
+ perror("recv OOB");
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int
+socket_putoob(const stream *s, char val)
+{
+ SOCKET fd = s->stream_data.s;
+ if (send(fd, &val, 1, MSG_OOB) == -1) {
+ perror("send OOB");
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef AF_UNIX
+/* UNIX domain sockets do not support OOB messages, so we need to do
+ * something different */
+#define OOBMSG0 '\377' /* the two special bytes we
send as "OOB" */
+#define OOBMSG1 '\377'
+
+static int
+socket_getoob_unix(const stream *s)
+{
+ SOCKET fd = s->stream_data.s;
+#ifdef HAVE_POLL
+ struct pollfd pfd = (struct pollfd) {
+ .fd = fd,
+ .events = POLLIN,
+ };
+ if (poll(&pfd, 1, 0) > 0)
+#else
+ fd_set fds;
+ struct timeval t = (struct timeval) {
+ .tv_sec = 0,
+ .tv_usec = 0,
+ };
+#ifdef FD_SETSIZE
+ if (fd >= FD_SETSIZE)
+ return 0;
+#endif
+ FD_ZERO(&fds);
+ FD_SET(fd, &fds);
+ if (select(
+#ifdef _MSC_VER
+ 0, /* ignored on Windows */
+#else
+ fd + 1,
+#endif
+ &fds, NULL, NULL, &t) > 0)
+#endif
+ {
+ char buf[3];
+ ssize_t nr;
+ nr = recv(fd, buf, 2, MSG_PEEK);
+ if (nr == 2 && buf[0] == OOBMSG0 && buf[1] == OOBMSG1) {
+ nr = recv(fd, buf, 3, 0);
+ if (nr == 3)
+ return buf[2];
+ }
+ }
+ return 0;
+}
+
+static int
+socket_putoob_unix(const stream *s, char val)
+{
+ char buf[3] = {
+ OOBMSG0,
+ OOBMSG1,
+ val,
+ };
+ if (send(s->stream_data.s, buf, 3, 0) == -1) {
+ perror("send");
+ return -1;
+ }
+ return 0;
+}
+#endif
+
static ssize_t
socket_write(stream *restrict s, const void *restrict buf, size_t elmsize,
size_t cnt)
{
@@ -103,22 +252,21 @@ socket_read(stream *restrict s, void *re
{
#ifdef _MSC_VER
int nr = 0;
+ int size;
+ if (elmsize * cnt > INT_MAX)
+ size = (int) (elmsize * (INT_MAX / elmsize));
+ else
+ size = (int) (elmsize * cnt);
#else
ssize_t nr = 0;
+ size_t size = elmsize * cnt;
#endif
- size_t size = elmsize * cnt;
if (s->errkind != MNSTR_NO__ERROR)
return -1;
if (size == 0)
return 0;
-#ifdef _MSC_VER
- /* recv only takes an int parameter, and read does not accept
- * sockets */
- if (size > INT_MAX)
- size = elmsize * (INT_MAX / elmsize);
-#endif
for (;;) {
if (s->timeout) {
int ret;
@@ -126,7 +274,11 @@ socket_read(stream *restrict s, void *re
struct pollfd pfd;
pfd = (struct pollfd) {.fd = s->stream_data.s,
- .events = POLLIN | POLLPRI};
+ .events = POLLIN};
+#ifdef AF_UNIX
+ if (s->putoob != socket_putoob_unix)
+ pfd.events |= POLLPRI;
+#endif
ret = poll(&pfd, 1, (int) s->timeout);
if (ret == -1 && errno == EINTR)
@@ -136,6 +288,21 @@ socket_read(stream *restrict s, void *re
return -1;
}
if (ret == 1 && pfd.revents & POLLPRI) {
+ /* discard regular data until OOB mark */
+ for (;;) {
+ int atmark = 0;
+ char flush[100];
+ if (ioctlsocket(s->stream_data.s,
SIOCATMARK, &atmark) < 0) {
+ perror("ioctl");
+ break;
+ }
+ if (atmark)
+ break;
+ if (recv(s->stream_data.s, flush,
sizeof(flush), 0) < 0) {
+ perror("recv");
+ break;
+ }
+ }
char b = 0;
switch (recv(s->stream_data.s, &b, 1, MSG_OOB))
{
case 0:
@@ -187,16 +354,22 @@ socket_read(stream *restrict s, void *re
assert(FD_ISSET(s->stream_data.s, &fds));
#endif
}
-#ifdef _MSC_VER
- nr = recv(s->stream_data.s, buf, (int) size, 0);
+ nr = recv(s->stream_data.s, buf, size, 0);
if (nr == SOCKET_ERROR) {
- mnstr_set_error_errno(s, MNSTR_READ_ERROR, "recv");
+ mnstr_set_error_errno(s, errno == EINTR ?
MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL);
return -1;
}
-#else
- nr = read(s->stream_data.s, buf, size);
- if (nr == -1) {
- mnstr_set_error_errno(s, errno == EINTR ?
MNSTR_INTERRUPT : MNSTR_READ_ERROR, NULL);
+#ifdef AF_UNIX
+ /* when reading a block size in a block stream
+ * (elmsize==2,cnt==1), we may actually get an "OOB" message
+ * when this is a Unix domain socket */
+ if (s->putoob == socket_putoob_unix &&
+ elmsize == 2 && cnt == 1 && nr == 2 &&
+ ((char *)buf)[0] == OOBMSG0 &&
+ ((char *)buf)[1] == OOBMSG1) {
+ /* also read (and discard) the "pay load" */
+ (void) recv(s->stream_data.s, buf, 1, 0);
+ mnstr_set_error(s, MNSTR_INTERRUPT, "query abort from
client");
return -1;
}
#endif
@@ -319,72 +492,6 @@ socket_isalive(const stream *s)
#endif
}
-static int
-socket_getoob(const stream *s)
-{
- SOCKET fd = s->stream_data.s;
-#ifdef HAVE_POLL
- struct pollfd pfd = (struct pollfd) {
- .fd = fd,
- .events = POLLPRI,
- };
- if (poll(&pfd, 1, 0) > 0)
-#else
- fd_set fds;
- struct timeval t = (struct timeval) {
- .tv_sec = 0,
- .tv_usec = 0,
- };
-#ifdef FD_SETSIZE
- if (fd >= FD_SETSIZE)
- return 0;
-#endif
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
- if (select(
-#ifdef _MSC_VER
- 0, /* ignored on Windows */
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]