Changeset: 78b4cf3f8581 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/78b4cf3f8581
Modified Files:
clients/Tests/exports.stable.out
Branch: default
Log Message:
Merge with Aug2024 branch.
diffs (truncated from 542 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -717,6 +717,7 @@ MapiMsg mapi_seek_row(MapiHdl hdl, int64
MapiHdl mapi_send(Mapi mid, const char *cmd) __attribute__((__nonnull__(1)));
MapiMsg mapi_setAutocommit(Mapi mid, bool autocommit)
__attribute__((__nonnull__(1)));
MapiMsg mapi_set_columnar_protocol(Mapi mid, bool columnar_protocol)
__attribute__((__nonnull__(1)));
+MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool
(*callback)(void *), void *callback_data) __attribute__((__nonnull__(1)));
MapiMsg mapi_set_size_header(Mapi mid, bool value)
__attribute__((__nonnull__(1)));
MapiMsg mapi_set_time_zone(Mapi mid, int seconds_east_of_utc)
__attribute__((__nonnull__(1)));
MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void
*), void *callback_data) __attribute__((__nonnull__(1)));
@@ -1738,6 +1739,7 @@ int mnstr_readStr(stream *restrict s, ch
ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t
elmsize, size_t cnt);
ssize_t mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt);
void mnstr_set_bigendian(stream *s, bool bigendian);
+void mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
__attribute__((__format__(__printf__, 3, 4)));
void mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void
*data);
const char *mnstr_version(void);
ssize_t mnstr_write(stream *restrict s, const void *restrict buf, size_t
elmsize, size_t cnt);
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -25,6 +25,7 @@
# include "getopt.h"
# endif
#endif
+#include "stream.h"
#include "mapi.h"
#include <unistd.h>
#include <string.h>
@@ -38,7 +39,6 @@
#include <readline/history.h>
#include "ReadlineTools.h"
#endif
-#include "stream.h"
#include "msqldump.h"
#define LIBMUTILS 1
#include "mprompt.h"
@@ -1308,6 +1308,10 @@ sigint_handler(int signum)
(void) signum;
state = INTERRUPT;
+#ifndef HAVE_SIGACTION
+ if (signal(signum, sigint_handler) == SIG_ERR)
+ perror("Could not reinstall sigal handler");
+#endif
#ifdef HAVE_LIBREADLINE
readline_int_handler();
#endif
@@ -2289,7 +2293,7 @@ doFile(Mapi mid, stream *fp, bool useins
char *newbuf;
state = READING;
l = mnstr_readline(fp, buf + length, bufsiz - length);
- if (l == -1 && state == INTERRUPT) {
+ if (l <= 0 && state == INTERRUPT) {
/* we were interrupted */
mnstr_clearerr(fp);
mnstr_write(toConsole, "\n", 1, 1);
@@ -3293,8 +3297,19 @@ isfile(FILE *fp)
return true;
}
+static bool
+interrupted(void *m)
+{
+ Mapi mid = m;
+ if (state == INTERRUPT) {
+ mnstr_set_error(mapi_get_from(mid), MNSTR_INTERRUPT, NULL);
+ return true;
+ }
+ return false;
+}
+
static void
-catch_interrupts(void)
+catch_interrupts(Mapi mid)
{
#ifdef HAVE_SIGACTION
struct sigaction sa;
@@ -3309,6 +3324,7 @@ catch_interrupts(void)
perror("Could not install signal handler");
}
#endif
+ mapi_set_rtimeout(mid, 100, interrupted, mid);
}
int
@@ -3743,7 +3759,7 @@ main(int argc, char **argv)
if (!has_fileargs && command == NULL && isatty(fileno(stdin))) {
char *lang;
- catch_interrupts();
+ catch_interrupts(mid);
if (mode == SQL) {
lang = "/SQL";
@@ -3843,7 +3859,7 @@ main(int argc, char **argv)
if (s == NULL) {
if (strcmp(arg, "-") == 0) {
- catch_interrupts();
+ catch_interrupts(mid);
s = stdin_rastream();
} else {
s = open_rastream(arg);
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2401,6 +2401,16 @@ prepareQuery(MapiHdl hdl, const char *cm
MapiMsg
+mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool (*callback)(void *),
void *callback_data)
+{
+ mapi_check(mid);
+ if (mid->trace)
+ printf("Set timeout to %u\n", timeout);
+ mnstr_settimeout(mid->from, timeout, callback, callback_data);
+ return MOK;
+}
+
+MapiMsg
mapi_set_timeout(Mapi mid, unsigned int timeout, bool (*callback)(void *),
void *callback_data)
{
mapi_check(mid);
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -208,6 +208,8 @@ mapi_export MapiMsg mapi_cache_freeup(Ma
mapi_export MapiMsg mapi_seek_row(MapiHdl hdl, int64_t rowne, int whence)
__attribute__((__nonnull__(1)));
+mapi_export MapiMsg mapi_set_rtimeout(Mapi mid, unsigned int timeout, bool
(*callback)(void *), void *callback_data)
+ __attribute__((__nonnull__(1)));
mapi_export MapiMsg mapi_set_timeout(Mapi mid, unsigned int timeout, bool
(*callback)(void *), void *callback_data)
__attribute__((__nonnull__(1)));
mapi_export MapiMsg mapi_timeout(Mapi mid, unsigned int time)
diff --git a/common/stream/socket_stream.c b/common/stream/socket_stream.c
--- a/common/stream/socket_stream.c
+++ b/common/stream/socket_stream.c
@@ -37,24 +37,26 @@ socket_getoob(const stream *s)
};
if (poll(&pfd, 1, 0) > 0)
#else
- fd_set fds;
+ fd_set xfds;
struct timeval t = (struct timeval) {
.tv_sec = 0,
.tv_usec = 0,
};
+#ifndef _MSC_VER
#ifdef FD_SETSIZE
if (fd >= FD_SETSIZE)
return 0;
#endif
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
+#endif
+ FD_ZERO(&xfds);
+ FD_SET(fd, &xfds);
if (select(
#ifdef _MSC_VER
0, /* ignored on Windows */
#else
fd + 1,
#endif
- NULL, NULL, &fds, &t) > 0)
+ NULL, NULL, &xfds, &t) > 0)
#endif
{
#ifdef HAVE_POLL
@@ -63,10 +65,11 @@ socket_getoob(const stream *s)
if ((pfd.revents & POLLPRI) == 0)
return -1;
#else
- if (!FD_ISSET(fd, &fds))
+ if (!FD_ISSET(fd, &xfds))
return 0;
#endif
/* discard regular data until OOB mark */
+#ifndef _MSC_VER /* Windows has to be
different... */
for (;;) {
int atmark = 0;
char flush[100];
@@ -81,6 +84,7 @@ socket_getoob(const stream *s)
break;
}
}
+#endif
char b = 0;
switch (recv(fd, &b, 1, MSG_OOB)) {
case 0:
@@ -107,7 +111,7 @@ socket_putoob(const stream *s, char val)
return 0;
}
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
/* UNIX domain sockets do not support OOB messages, so we need to do
* something different */
#define OOBMSG0 '\377' /* the two special bytes we
send as "OOB" */
@@ -129,10 +133,12 @@ socket_getoob_unix(const stream *s)
.tv_sec = 0,
.tv_usec = 0,
};
+#ifndef _MSC_VER
#ifdef FD_SETSIZE
if (fd >= FD_SETSIZE)
return 0;
#endif
+#endif
FD_ZERO(&fds);
FD_SET(fd, &fds);
if (select(
@@ -176,7 +182,7 @@ static ssize_t
socket_write(stream *restrict s, const void *restrict buf, size_t elmsize,
size_t cnt)
{
size_t size = elmsize * cnt, res = 0;
-#ifdef NATIVE_WIN32
+#ifdef _MSC_VER
int nr = 0;
#else
ssize_t nr = 0;
@@ -191,12 +197,14 @@ socket_write(stream *restrict s, const v
errno = 0;
while (res < size &&
(
-#ifdef NATIVE_WIN32
- /* send works on int, make sure the argument fits */
- ((nr = send(s->stream_data.s, (const char *) buf + res,
(int) min(size - res, 1 << 16), 0)) > 0)
+ /* Windows send works on int, make sure the argument
fits */
+ ((nr = send(s->stream_data.s, (const char *) buf + res,
+#ifdef _MSC_VER
+ (int) min(size - res, 1 <<
16)
#else
- ((nr = write(s->stream_data.s, (const char *) buf + res,
size - res)) > 0)
+ size
#endif
+ , 0)) > 0)
|| (nr < 0 && /* syscall failed */
s->timeout > 0 && /* potentially timeout */
#ifdef _MSC_VER
@@ -210,7 +218,7 @@ socket_write(stream *restrict s, const v
#endif
s->timeout_func != NULL && /* callback function
exists */
!s->timeout_func(s->timeout_data)) /* callback
says don't stop */
- ||(nr < 0 &&
+ || (nr < 0 &&
#ifdef _MSC_VER
WSAGetLastError() == WSAEINTR
#else
@@ -275,20 +283,82 @@ socket_read(stream *restrict s, void *re
pfd = (struct pollfd) {.fd = s->stream_data.s,
.events = POLLIN};
-#ifdef AF_UNIX
+#ifdef HAVE_SYS_UN_H
if (s->putoob != socket_putoob_unix)
pfd.events |= POLLPRI;
#endif
ret = poll(&pfd, 1, (int) s->timeout);
- if (ret == -1 && errno == EINTR)
- continue;
- if (ret == -1 || (pfd.revents & POLLERR)) {
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
mnstr_set_error_errno(s, MNSTR_READ_ERROR,
"poll error");
return -1;
}
- if (ret == 1 && pfd.revents & POLLPRI) {
+ if (ret == 1) {
+ if (pfd.revents & POLLHUP) {
+ /* hung up, return EOF */
+ s->eof = true;
+ return 0;
+ }
+ if (pfd.revents & POLLPRI) {
+ /* discard regular data until OOB mark
*/
+ for (;;) {
+ int atmark = 0;
+ char flush[100];
+ if
(ioctlsocket(s->stream_data.s, SIOCATMARK, &atmark) < 0) {
+ perror("ioctl");
+ break;
+ }
+ if (atmark)
+ break;
+ if (recv(s->stream_data.s,
flush, sizeof(flush), 0) < 0) {
+ perror("recv");
+ break;
+ }
+ }
+ char b = 0;
+ switch (recv(s->stream_data.s, &b, 1,
MSG_OOB)) {
+ case 0:
+ /* unexpectedly didn't receive
a byte */
+ continue;
+ case 1:
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]