Changeset: 475af50e54c4 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=475af50e54c4
Branch: Oct2020
Log Message:
merged
diffs (truncated from 707 to 300 lines):
diff --git a/common/stream/Tests/All b/common/stream/Tests/All
--- a/common/stream/Tests/All
+++ b/common/stream/Tests/All
@@ -1,13 +1,13 @@
read_uncompressed
HAVE_LIBBZ2?read_bz2
HAVE_LIBZ?read_gz
-HAVE_LIBLZ4?read_lz4
+HAVE_LIBLZ4&HAVE_PYTHON_LZ4?read_lz4
HAVE_LIBLZMA?read_xz
write_uncompressed
HAVE_LIBBZ2?write_bz2
HAVE_LIBZ?write_gz
-HAVE_LIBLZ4?write_lz4
+HAVE_LIBLZ4&HAVE_PYTHON_LZ4?write_lz4
HAVE_LIBLZMA?write_xz
urlstream
diff --git a/common/stream/Tests/testdata.py b/common/stream/Tests/testdata.py
--- a/common/stream/Tests/testdata.py
+++ b/common/stream/Tests/testdata.py
@@ -2,10 +2,7 @@
# Generating test files and verifying them after transmission.
-import bz2
import gzip
-import lz4.frame
-import lzma
import os
import sys
@@ -208,10 +205,13 @@ class TestFile:
elif self.compression == 'gz':
f = gzip.GzipFile(filename, 'wb', fileobj=fileobj,
mtime=131875200, compresslevel=1)
elif self.compression == 'bz2':
+ import bz2
f = bz2.BZ2File(fileobj, 'wb', compresslevel=1)
elif self.compression == 'xz':
+ import lzma
f = lzma.LZMAFile(fileobj, 'wb', preset=1)
elif self.compression == 'lz4': # ok
+ import lz4.frame
f = lz4.frame.LZ4FrameFile(fileobj, 'wb', compression_level=1)
else:
raise Exception("Unknown compression scheme: " + self.compression)
@@ -225,10 +225,13 @@ class TestFile:
elif self.compression == 'gz':
f = gzip.GzipFile(filename, 'rb', mtime=131875200)
elif self.compression == 'bz2':
+ import bz2
f = bz2.BZ2File(filename, 'rb')
elif self.compression == 'xz':
+ import lzma
f = lzma.LZMAFile(filename, 'rb')
elif self.compression == 'lz4':
+ import lz4.frame
f = lz4.frame.LZ4FrameFile(filename, 'rb')
else:
raise Exception("Unknown compression scheme: " + self.compression)
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -272,60 +272,51 @@ static ATOMIC_TYPE threadno = ATOMIC_VAR
static void
SERVERlistenThread(SOCKET *Sock)
{
- char *msg = 0;
+ char *msg = NULL;
int retval;
- SOCKET sock = Sock[0];
- SOCKET usock = Sock[1];
- SOCKET msgsock = INVALID_SOCKET;
+ SOCKET socks[3] = {Sock[0], Sock[1], Sock[2]};
struct challengedata *data;
MT_Id tid;
stream *s;
+ int i;
GDKfree(Sock);
(void) ATOMIC_INC(&nlistener);
do {
+ SOCKET msgsock = INVALID_SOCKET;
#ifdef HAVE_POLL
- struct pollfd pfd[2];
+ struct pollfd pfd[3];
nfds_t npfd;
npfd = 0;
- if (sock != INVALID_SOCKET)
- pfd[npfd++] = (struct pollfd) {.fd = sock, .events =
POLLIN};
-#ifdef HAVE_SYS_UN_H
- if (usock != INVALID_SOCKET)
- pfd[npfd++] = (struct pollfd) {.fd = usock, .events =
POLLIN};
-#endif
+ for (i = 0; i < 3; i++) {
+ if (socks[i] != INVALID_SOCKET)
+ pfd[npfd++] = (struct pollfd) {.fd = socks[i],
+
.events = POLLIN};
+ }
/* Wait up to 0.1 seconds (0.01 if testing) */
retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 100);
if (retval == -1 && errno == EINTR)
continue;
#else
- struct timeval tv;
fd_set fds;
FD_ZERO(&fds);
- if (sock != INVALID_SOCKET)
- FD_SET(sock, &fds);
-#ifdef HAVE_SYS_UN_H
- if (usock != INVALID_SOCKET)
- FD_SET(usock, &fds);
-#endif
+ /* temporarily use msgsock to record the highest socket fd */
+ for (i = 0; i < 3; i++) {
+ if (socks[i] != INVALID_SOCKET) {
+ FD_SET(socks[i], &fds);
+ if (msgsock == INVALID_SOCKET || socks[i] >
msgsock)
+ msgsock = socks[i];
+ }
+ }
/* Wait up to 0.1 seconds (0.01 if testing) */
- tv = (struct timeval) {
+ struct timeval tv = (struct timeval) {
.tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 100000,
};
- /* temporarily use msgsock to record the larger of sock and
usock */
-#ifdef _MSC_VER
- msgsock = 0; /* value is ignored on Windows
*/
-#else
- msgsock = sock;
-#ifdef HAVE_SYS_UN_H
- if (usock != INVALID_SOCKET && (sock == INVALID_SOCKET || usock
> sock))
- msgsock = usock;
-#endif
-#endif
- retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv);
+ retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
+ msgsock = INVALID_SOCKET;
#endif
if (ATOMIC_GET(&serverexiting) || GDKexiting())
break;
@@ -346,38 +337,45 @@ SERVERlistenThread(SOCKET *Sock)
}
continue;
}
- if (sock != INVALID_SOCKET &&
+ bool isusock = false;
#ifdef HAVE_POLL
- (npfd > 0 && pfd[0].fd == sock && pfd[0].revents &
POLLIN)
+ for (i = 0; i < (int) npfd; i++) {
+ if (pfd[i].revents & POLLIN) {
+ msgsock = pfd[i].fd;
+ isusock = msgsock == socks[2];
+ break;
+ }
+ }
#else
- FD_ISSET(sock, &fds)
+ for (i = 0; i < 3; i++) {
+ if (socks[i] >= 0 && FD_ISSET(socks[i], &fds)) {
+ msgsock = socks[i];
+ isusock = i == 2;
+ break;
+ }
+ }
#endif
- ) {
- if ((msgsock = accept4(sock, NULL, NULL, SOCK_CLOEXEC))
== INVALID_SOCKET) {
- if (
+ if (msgsock == INVALID_SOCKET)
+ continue;
+
+ if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) ==
INVALID_SOCKET) {
+ if (
#ifdef _MSC_VER
- WSAGetLastError() != WSAEINTR
+ WSAGetLastError() != WSAEINTR
#else
- errno != EINTR
+ errno != EINTR
#endif
- || !ATOMIC_GET(&serveractive)) {
- msg = "accept failed";
- goto error;
- }
- continue;
+ || !ATOMIC_GET(&serveractive)) {
+ msg = "accept failed";
+ goto error;
}
+ continue;
+ }
#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
- (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
#endif
#ifdef HAVE_SYS_UN_H
- } else if (usock != INVALID_SOCKET &&
-#ifdef HAVE_POLL
- ((npfd > 0 && pfd[0].fd == usock &&
pfd[0].revents & POLLIN) ||
- (npfd > 1 && pfd[1].fd == usock &&
pfd[1].revents & POLLIN))
-#else
- FD_ISSET(usock, &fds)
-#endif
- ) {
+ if (isusock) {
struct msghdr msgh;
struct iovec iov;
char buf[1];
@@ -385,23 +383,6 @@ SERVERlistenThread(SOCKET *Sock)
char ccmsg[CMSG_SPACE(sizeof(int))];
struct cmsghdr *cmsg;
- if ((msgsock = accept4(usock, NULL, NULL,
SOCK_CLOEXEC)) == INVALID_SOCKET) {
- if (
-#ifdef _MSC_VER
- WSAGetLastError() != WSAEINTR
-#else
- errno != EINTR
-#endif
- ) {
- msg = "accept failed";
- goto error;
- }
- continue;
- }
-#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
- (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
-#endif
-
/* BEWARE: unix domain sockets have a slightly different
* behaviour initialy than normal sockets, because we
can
* send filedescriptors or credentials with them. To
do so,
@@ -461,8 +442,6 @@ SERVERlistenThread(SOCKET *Sock)
continue;
}
#endif
- } else {
- continue;
}
data = GDKmalloc(sizeof(*data));
@@ -511,18 +490,14 @@ SERVERlistenThread(SOCKET *Sock)
continue;
}
} while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
+ error:
(void) ATOMIC_DEC(&nlistener);
- if (sock != INVALID_SOCKET)
- closesocket(sock);
- if (usock != INVALID_SOCKET)
- closesocket(usock);
+ for (i = 0; i < 3; i++)
+ if (socks[i] != INVALID_SOCKET)
+ closesocket(socks[i]);
+ if (msg)
+ TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
return;
-error:
- TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
- if (sock != INVALID_SOCKET)
- closesocket(sock);
- if (usock != INVALID_SOCKET)
- closesocket(usock);
}
#ifdef _MSC_VER
@@ -542,13 +517,12 @@ start_listen(SOCKET *sockp, int *portp,
.ai_socktype = SOCK_STREAM,
.ai_protocol = IPPROTO_TCP,
};
- struct sockaddr_storage addr;
- SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
int e = 0;
int ipv6_vs6only = -1;
SOCKET sock = INVALID_SOCKET;
const char *err;
- *sockp = INVALID_SOCKET;
+ int nsock = 0;
+ sockp[0] = sockp[1] = INVALID_SOCKET;
host[0] = 0;
if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
hints.ai_family = AF_INET6;
@@ -583,110 +557,106 @@ start_listen(SOCKET *sockp, int *portp,
}
char sport[8]; /* max "65535" */
snprintf(sport, sizeof(sport), "%d", *portp);
- int check = getaddrinfo(listenaddr, sport, &hints, &result);
- if (check != 0 && ipv6_vs6only == 0) {
- /* if IPv6 didn't work and we can use IPv4 as well, try just
IPv4 */
- hints.ai_family = AF_INET;
- ipv6_vs6only = -1;
- if (listenaddr && strcmp(listenaddr, "::1") == 0)
- listenaddr = "127.0.0.1";
- check = getaddrinfo(listenaddr, sport, &hints, &result);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list