This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_kqueue_to_udpmc in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9747fcb19423567a8ffd4029f55bc716c168f9e3 Author: Roy Bulter <[email protected]> AuthorDate: Sat Apr 18 08:31:28 2020 +0200 add kqueue --- .../src/pubsub_udpmc_topic_receiver.c | 49 +++++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c index abc037f..bb437d6 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c @@ -22,7 +22,13 @@ #include <pubsub/subscriber.h> #include <memory.h> #include <pubsub_constants.h> +#if defined(__APPLE__) +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> +#else #include <sys/epoll.h> +#endif #include <assert.h> #include <pubsub_endpoint.h> #include <arpa/inet.h> @@ -32,7 +38,7 @@ #include "large_udp.h" #include "pubsub_udpmc_common.h" -#define MAX_EPOLL_EVENTS 10 +#define MAX_EVENTS 10 #define RECV_THREAD_TIMEOUT 5 #define UDP_BUFFER_SIZE 65535 #define MAX_UDP_SESSIONS 16 @@ -127,8 +133,11 @@ pubsub_udpmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_con receiver->recvThread.running = true; receiver->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); receiver->topicEpollFd = epoll_create1(0); - - +#if defined(__APPLE__) + receiver->topicEpollFd = kqueue(); +#else + receiver->topicEpollFd = epoll_create1(0); +#endif celixThreadMutex_create(&receiver->subscribers.mutex, NULL); celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); celixThreadMutex_create(&receiver->recvThread.mutex, NULL); @@ -295,9 +304,15 @@ void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *rec psa_udpmc_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, key); free(key); if (entry != NULL && entry->connected) { +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->recvSocket, EVFILT_READ, EV_DELETE, 0, 0, 0); + rc = kevent (receiver->topicEpollFd, &ev, 1, NULL, 0, NULL); +#else struct epoll_event ev; memset(&ev, 0, sizeof(ev)); int rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_DEL, entry->recvSocket, &ev); +#endif if (rc < 0) { fprintf(stderr, "[PSA UDPMC] Error disconnecting TopicReceiver %s/%s to %s:%li.\n%s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno)); } @@ -374,8 +389,6 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, static void* psa_udpmc_recvThread(void * data) { pubsub_udpmc_topic_receiver_t *receiver = data; - struct epoll_event events[MAX_EPOLL_EVENTS]; - celixThreadMutex_lock(&receiver->recvThread.mutex); bool running = receiver->recvThread.running; celixThreadMutex_unlock(&receiver->recvThread.mutex); @@ -396,7 +409,15 @@ static void* psa_udpmc_recvThread(void * data) { psa_udpmc_initializeAllSubscribers(receiver); } - int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); + unsigned int timeout = RECV_THREAD_TIMEOUT * 1000; +#if defined(__APPLE__) + struct kevent events[MAX_EVENTS]; + struct timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; + int nfds = kevent (receiver->topicEpollFd, NULL, 0, &events[0], MAX_EVENTS, timeout ? &ts : NULL); +#else + struct epoll_event events[MAX_EVENTS]; + int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EVENTS, RECV_THREAD_TIMEOUT * 1000); +#endif int i; for (i = 0; i < nfds; i++ ) { unsigned int index; @@ -514,11 +535,17 @@ static bool psa_udpmc_connectToEntry(pubsub_udpmc_topic_receiver_t *receiver, ps rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)); } if (entry->recvSocket >= 0 && rc >= 0) { - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = entry->recvSocket; - rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev); +#if defined(__APPLE__) + struct kevent ev; + EV_SET (&ev, entry->recvSocket, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0); + rc = kevent (receiver->topicEpollFd, &ev, 1, NULL, 0, NULL); +#else + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.fd = entry->recvSocket; + rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev); +#endif } if (entry->recvSocket < 0 || rc < 0) {
