This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_kqueue_to_udpmc
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9747fcb19423567a8ffd4029f55bc716c168f9e3
Author: Roy Bulter <[email protected]>
AuthorDate: Sat Apr 18 08:31:28 2020 +0200

    add kqueue
---
 .../src/pubsub_udpmc_topic_receiver.c              | 49 +++++++++++++++++-----
 1 file changed, 38 insertions(+), 11 deletions(-)

diff --git 
a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index abc037f..bb437d6 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -22,7 +22,13 @@
 #include <pubsub/subscriber.h>
 #include <memory.h>
 #include <pubsub_constants.h>
+#if defined(__APPLE__)
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#else
 #include <sys/epoll.h>
+#endif
 #include <assert.h>
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>
@@ -32,7 +38,7 @@
 #include "large_udp.h"
 #include "pubsub_udpmc_common.h"
 
-#define MAX_EPOLL_EVENTS        10
+#define MAX_EVENTS        10
 #define RECV_THREAD_TIMEOUT     5
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
@@ -127,8 +133,11 @@ pubsub_udpmc_topic_receiver_t* 
pubsub_udpmcTopicReceiver_create(celix_bundle_con
     receiver->recvThread.running = true;
     receiver->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
     receiver->topicEpollFd = epoll_create1(0);
-
-
+#if defined(__APPLE__)
+    receiver->topicEpollFd = kqueue();
+#else
+    receiver->topicEpollFd = epoll_create1(0);
+#endif
     celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
     celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
     celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
@@ -295,9 +304,15 @@ void 
pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *rec
     psa_udpmc_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, key);
     free(key);
     if (entry != NULL && entry->connected) {
+#if defined(__APPLE__)
+        struct kevent ev;
+        EV_SET (&ev, entry->recvSocket, EVFILT_READ, EV_DELETE, 0, 0, 0);
+        rc = kevent (receiver->topicEpollFd, &ev, 1, NULL, 0, NULL);
+#else
         struct epoll_event ev;
         memset(&ev, 0, sizeof(ev));
         int rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_DEL, 
entry->recvSocket, &ev);
+#endif
         if (rc < 0) {
             fprintf(stderr, "[PSA UDPMC] Error disconnecting TopicReceiver 
%s/%s to %s:%li.\n%s", receiver->scope == NULL ? "(null)" : receiver->scope, 
receiver->topic, socketAddress, socketPort, strerror(errno));
         }
@@ -374,8 +389,6 @@ static void pubsub_udpmcTopicReceiver_removeSubscriber(void 
*handle, void *svc,
 static void* psa_udpmc_recvThread(void * data) {
     pubsub_udpmc_topic_receiver_t *receiver = data;
 
-    struct epoll_event events[MAX_EPOLL_EVENTS];
-
     celixThreadMutex_lock(&receiver->recvThread.mutex);
     bool running = receiver->recvThread.running;
     celixThreadMutex_unlock(&receiver->recvThread.mutex);
@@ -396,7 +409,15 @@ static void* psa_udpmc_recvThread(void * data) {
             psa_udpmc_initializeAllSubscribers(receiver);
         }
 
-        int nfds = epoll_wait(receiver->topicEpollFd, events, 
MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+        unsigned int timeout = RECV_THREAD_TIMEOUT * 1000;
+#if defined(__APPLE__)
+        struct kevent events[MAX_EVENTS];
+        struct timespec ts = {timeout / 1000, (timeout  % 1000) * 1000000};
+        int nfds = kevent (receiver->topicEpollFd, NULL, 0, &events[0], 
MAX_EVENTS, timeout ? &ts : NULL);
+#else
+        struct epoll_event events[MAX_EVENTS];
+        int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EVENTS, 
RECV_THREAD_TIMEOUT * 1000);
+#endif
         int i;
         for (i = 0; i < nfds; i++ ) {
             unsigned int index;
@@ -514,11 +535,17 @@ static bool 
psa_udpmc_connectToEntry(pubsub_udpmc_topic_receiver_t *receiver, ps
         rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, 
sizeof(mcListenAddr));
     }
     if (entry->recvSocket >= 0 && rc >= 0) {
-        struct epoll_event ev;
-        memset(&ev, 0, sizeof(ev));
-        ev.events = EPOLLIN;
-        ev.data.fd = entry->recvSocket;
-        rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, 
entry->recvSocket, &ev);
+#if defined(__APPLE__)
+      struct kevent ev;
+      EV_SET (&ev, entry->recvSocket, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 
0);
+      rc = kevent (receiver->topicEpollFd, &ev, 1, NULL, 0, NULL);
+#else
+      struct epoll_event ev;
+      memset(&ev, 0, sizeof(ev));
+      ev.events = EPOLLIN;
+      ev.data.fd = entry->recvSocket;
+      rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, 
&ev);
+#endif
     }
 
     if (entry->recvSocket < 0 || rc < 0) {

Reply via email to