This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_kqueue_to_tcp_admin in repository https://gitbox.apache.org/repos/asf/celix.git
commit 836ad9abadc217cd85cbc3fa7f6403c6801c9b8c Author: Roy Bulter <[email protected]> AuthorDate: Tue Apr 14 21:23:27 2020 +0200 Add Kqueue to TcpAdmin --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 129 ++++++++++++++++++--- .../src/pubsub_tcp_topic_receiver.c | 1 - 2 files changed, 115 insertions(+), 15 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 0c2d41e..15c10a5 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -31,7 +31,13 @@ #include <errno.h> #include <array_list.h> #include <pthread.h> +#if defined(__APPLE__) +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> +#else #include <sys/epoll.h> +#endif #include <limits.h> #include <assert.h> #include "ctype.h" @@ -44,9 +50,13 @@ #include "utils.h" #include "pubsub_tcp_handler.h" -#define MAX_EPOLL_EVENTS 64 +#define MAX_EVENTS 64 #define MAX_DEFAULT_BUFFER_SIZE 4u +#if defined(__APPLE__) +#define MSG_NOSIGNAL (0) +#endif + #define L_DEBUG(...) \ logHelper_log(handle->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) #define L_INFO(...) \ @@ -143,7 +153,11 @@ static void *pubsub_tcpHandler_thread(void *data); pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protocol, log_helper_t *logHelper) { pubsub_tcpHandler_t *handle = calloc(sizeof(*handle), 1); if (handle != NULL) { +#if defined(__APPLE__) + handle->efd = kqueue(); +#else handle->efd = epoll_create1(0); +#endif handle->connection_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); handle->connection_fd_map = hashMap_create(NULL, NULL, NULL, NULL); handle->interface_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); @@ -191,8 +205,7 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) { pubsub_tcpHandler_closeConnectionEntry(handle, entry, true); } } - if (handle->efd >= 0) - close(handle->efd); + if (handle->efd >= 0) close(handle->efd); hashMap_destroy(handle->connection_url_map, false, false); hashMap_destroy(handle->connection_fd_map, false, false); hashMap_destroy(handle->interface_url_map, false, false); @@ -418,14 +431,20 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { } // Subscribe File Descriptor to epoll if ((rc >= 0) && (entry)) { +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); + rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); +#else struct epoll_event event; - bzero(&event, sizeof(struct epoll_event)); // zero the struct + bzero(&event, sizeof(struct epoll_event)); // zero the struct event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT; event.data.fd = entry->fd; rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); +#endif if (rc < 0) { pubsub_tcpHandler_freeEntry(entry); - L_ERROR("[TCP Socket] Cannot create epoll %s\n", strerror(errno)); + L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } } @@ -466,9 +485,15 @@ static inline int pubsub_tcpHandler_closeConnectionEntry( fprintf(stdout, "[TCP Socket] Close connection to url: %s: \n", entry->url); hashMap_remove(handle->connection_fd_map, (void *) (intptr_t) entry->fd); if ((handle->efd >= 0)) { +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0); + rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); +#else struct epoll_event event; bzero(&event, sizeof(struct epoll_event)); // zero the struct rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event); +#endif if (rc < 0) { L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno)); } @@ -496,9 +521,15 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", entry->url); hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd); if ((handle->efd >= 0)) { +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0); + rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); +#else struct epoll_event event; bzero(&event, sizeof(struct epoll_event)); // zero the struct rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event); +#endif if (rc < 0) { L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno)); } @@ -563,13 +594,19 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) { } } if ((rc >= 0) && (handle->efd >= 0)) { +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0); + rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); +#else struct epoll_event event; bzero(&event, sizeof(event)); // zero the struct event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; event.data.fd = fd; rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event); +#endif if (rc < 0) { - L_ERROR("[TCP Socket] Cannot create epoll: %s\n", strerror(errno)); + L_ERROR("[TCP Socket] Cannot create poll: %s\n", strerror(errno)); errno = 0; pubsub_tcpHandler_freeEntry(entry); entry = NULL; @@ -993,7 +1030,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message } nbytes = 0; if (entry->fd >= 0 && msgSize && headerData) - nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL | flags); + nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); // When a specific socket keeps reporting errors can indicate a subscriber // which is not active anymore, the connection will remain until the retry // counter exceeds the maximum retry count. @@ -1063,29 +1100,35 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) { // Handle non-blocking accept (sender) // static inline -void pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *pendingConnectionEntry) { +int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *pendingConnectionEntry) { celixThreadRwlock_writeLock(&handle->dbLock); // new connection available struct sockaddr_in their_addr; socklen_t len = sizeof(struct sockaddr_in); - int fd = accept(pendingConnectionEntry->fd, &their_addr, &len); + int fd = accept(pendingConnectionEntry->fd, (struct sockaddr*)&their_addr, &len); int rc = fd; if (rc == -1) { L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno)); } if (rc >= 0) { // handle new connection: - struct epoll_event event; - bzero(&event, sizeof(event)); // zero the struct struct sockaddr_in sin; getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, &len); char *interface_url = pubsub_utils_url_get_url(&sin, NULL); char *url = pubsub_utils_url_get_url(&their_addr, NULL); psa_tcp_connection_entry_t *entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &their_addr); +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0); + rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); +#else + struct epoll_event event; + bzero(&event, sizeof(event)); // zero the struct event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT; event.data.fd = entry->fd; // Register Read to epoll rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); +#endif if (rc < 0) { pubsub_tcpHandler_freeEntry(entry); free(entry); @@ -1102,6 +1145,7 @@ void pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connec free(interface_url); } celixThreadRwlock_unlock(&handle->dbLock); + return fd; } // @@ -1158,18 +1202,74 @@ void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) { if (handle->receiverConnectMessageCallback) handle->receiverConnectMessageCallback(handle->receiverConnectPayload, entry->url, false); entry->connected = true; +#if defined(__APPLE__) + rc = 0; +#else struct epoll_event event; bzero(&event, sizeof(event)); // zero the struct event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; event.data.fd = fd; // Register Modify epoll rc = epoll_ctl(handle->efd, EPOLL_CTL_MOD, fd, &event); +#endif if (rc < 0) - L_ERROR("[TCP Socket] Cannot create epoll\n"); + L_ERROR("[TCP Socket] Cannot create poll\n"); } celixThreadRwlock_unlock(&handle->dbLock); } +#if defined(__APPLE__) +// +// The main socket event loop +// +static inline +void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { + int rc = 0; + if (handle->efd >= 0) { + int nof_events = 0; + // Wait for events. + struct kevent events[MAX_EVENTS]; + struct timespec ts = {handle->timeout / 1000, (handle->timeout % 1000) * 1000000}; + nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout ? &ts : NULL); + if (nof_events < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + } else + L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno)); + } + for (int i = 0; i < nof_events; i++) { + hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map); + psa_tcp_connection_entry_t *pendingConnectionEntry = NULL; + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (events[i].ident == entry->fd) + pendingConnectionEntry = entry; + } + if (pendingConnectionEntry) { + int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry); + pubsub_tcpHandler_connectionHandler(handle, fd); + } else if (events[i].filter & EVFILT_READ) { + pubsub_tcpHandler_readHandler(handle, events[i].ident); + } else if (events[i].flags & EV_EOF) { + int err = 0; + socklen_t len = sizeof(int); + rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len); + if (rc != 0) { + L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno)); + continue; + } + pubsub_tcpHandler_close(handle, events[i].ident); + } else if (events[i].flags & EV_ERROR) { + L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n", strerror(errno)); + pubsub_tcpHandler_close(handle, events[i].ident); + continue; + } + } + } + return; +} + +#else + // // The main socket event loop // @@ -1177,9 +1277,9 @@ static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { int rc = 0; if (handle->efd >= 0) { - struct epoll_event events[MAX_EPOLL_EVENTS]; int nof_events = 0; - nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout); + struct epoll_event events[MAX_EVENTS]; + nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, handle->timeout); if (nof_events < 0) { if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { } else @@ -1217,6 +1317,7 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { } return; } +#endif // // The socket thread diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 1aa81d5..9fe9f33 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -23,7 +23,6 @@ #include <pubsub/subscriber.h> #include <memory.h> #include <pubsub_constants.h> -#include <sys/epoll.h> #include <assert.h> #include <pubsub_endpoint.h> #include <arpa/inet.h>
