This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_kqueue_to_tcp_admin
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 836ad9abadc217cd85cbc3fa7f6403c6801c9b8c
Author: Roy Bulter <[email protected]>
AuthorDate: Tue Apr 14 21:23:27 2020 +0200

    Add Kqueue to TcpAdmin
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 129 ++++++++++++++++++---
 .../src/pubsub_tcp_topic_receiver.c                |   1 -
 2 files changed, 115 insertions(+), 15 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 0c2d41e..15c10a5 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -31,7 +31,13 @@
 #include <errno.h>
 #include <array_list.h>
 #include <pthread.h>
+#if defined(__APPLE__)
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#else
 #include <sys/epoll.h>
+#endif
 #include <limits.h>
 #include <assert.h>
 #include "ctype.h"
@@ -44,9 +50,13 @@
 #include "utils.h"
 #include "pubsub_tcp_handler.h"
 
-#define MAX_EPOLL_EVENTS   64
+#define MAX_EVENTS   64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
 
+#if defined(__APPLE__)
+#define MSG_NOSIGNAL (0)
+#endif
+
 #define L_DEBUG(...) \
     logHelper_log(handle->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -143,7 +153,11 @@ static void *pubsub_tcpHandler_thread(void *data);
 pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t 
*protocol, log_helper_t *logHelper) {
     pubsub_tcpHandler_t *handle = calloc(sizeof(*handle), 1);
     if (handle != NULL) {
+#if defined(__APPLE__)
+        handle->efd = kqueue();
+#else
         handle->efd = epoll_create1(0);
+#endif
         handle->connection_url_map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
         handle->connection_fd_map = hashMap_create(NULL, NULL, NULL, NULL);
         handle->interface_url_map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
@@ -191,8 +205,7 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) 
{
                 pubsub_tcpHandler_closeConnectionEntry(handle, entry, true);
             }
         }
-        if (handle->efd >= 0)
-            close(handle->efd);
+        if (handle->efd >= 0) close(handle->efd);
         hashMap_destroy(handle->connection_url_map, false, false);
         hashMap_destroy(handle->connection_fd_map, false, false);
         hashMap_destroy(handle->interface_url_map, false, false);
@@ -418,14 +431,20 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t 
*handle, char *url) {
         }
         // Subscribe File Descriptor to epoll
         if ((rc >= 0) && (entry)) {
+#if defined(__APPLE__)
+            struct kevent ev;
+            EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
+            rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+#else
             struct epoll_event event;
-            bzero(&event, sizeof(struct epoll_event)); // zero the struct
+            bzero(&event,  sizeof(struct epoll_event)); // zero the struct
             event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
             event.data.fd = entry->fd;
             rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+#endif
             if (rc < 0) {
                 pubsub_tcpHandler_freeEntry(entry);
-                L_ERROR("[TCP Socket] Cannot create epoll %s\n", 
strerror(errno));
+                L_ERROR("[TCP Socket] Cannot create poll event %s\n", 
strerror(errno));
                 entry = NULL;
             }
         }
@@ -466,9 +485,15 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(
         fprintf(stdout, "[TCP Socket] Close connection to url: %s: \n", 
entry->url);
         hashMap_remove(handle->connection_fd_map, (void *) (intptr_t) 
entry->fd);
         if ((handle->efd >= 0)) {
+#if defined(__APPLE__)
+          struct kevent ev;
+          EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0);
+          rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+#else
             struct epoll_event event;
             bzero(&event, sizeof(struct epoll_event)); // zero the struct
             rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
+#endif
             if (rc < 0) {
                 L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
             }
@@ -496,9 +521,15 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t 
*handle,
         fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", 
entry->url);
         hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) 
entry->fd);
         if ((handle->efd >= 0)) {
+#if defined(__APPLE__)
+            struct kevent ev;
+            EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0);
+            rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+#else
             struct epoll_event event;
             bzero(&event, sizeof(struct epoll_event)); // zero the struct
             rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
+#endif
             if (rc < 0) {
                 L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
             }
@@ -563,13 +594,19 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, 
char *url) {
             }
         }
         if ((rc >= 0) && (handle->efd >= 0)) {
+#if defined(__APPLE__)
+            struct kevent ev;
+            EV_SET (&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0);
+            rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+#else
             struct epoll_event event;
             bzero(&event, sizeof(event)); // zero the struct
             event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
             event.data.fd = fd;
             rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
+#endif
             if (rc < 0) {
-                L_ERROR("[TCP Socket] Cannot create epoll: %s\n", 
strerror(errno));
+                L_ERROR("[TCP Socket] Cannot create poll: %s\n", 
strerror(errno));
                 errno = 0;
                 pubsub_tcpHandler_freeEntry(entry);
                 entry = NULL;
@@ -993,7 +1030,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, 
pubsub_protocol_message
             }
             nbytes = 0;
             if (entry->fd >= 0 && msgSize && headerData)
-                nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL | flags);
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
             //  When a specific socket keeps reporting errors can indicate a 
subscriber
             //  which is not active anymore, the connection will remain until 
the retry
             //  counter exceeds the maximum retry count.
@@ -1063,29 +1100,35 @@ char 
*pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
 // Handle non-blocking accept (sender)
 //
 static inline
-void pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *pendingConnectionEntry) {
+int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *pendingConnectionEntry) {
     celixThreadRwlock_writeLock(&handle->dbLock);
     // new connection available
     struct sockaddr_in their_addr;
     socklen_t len = sizeof(struct sockaddr_in);
-    int fd = accept(pendingConnectionEntry->fd, &their_addr, &len);
+    int fd = accept(pendingConnectionEntry->fd, (struct sockaddr*)&their_addr, 
&len);
     int rc = fd;
     if (rc == -1) {
         L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
     }
     if (rc >= 0) {
         // handle new connection:
-        struct epoll_event event;
-        bzero(&event, sizeof(event)); // zero the struct
         struct sockaddr_in sin;
         getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, 
&len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
         char *url = pubsub_utils_url_get_url(&their_addr, NULL);
         psa_tcp_connection_entry_t *entry = 
pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &their_addr);
+#if defined(__APPLE__)
+        struct kevent ev;
+        EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0);
+        rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+#else
+        struct epoll_event event;
+        bzero(&event, sizeof(event)); // zero the struct
         event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+#endif
         if (rc < 0) {
             pubsub_tcpHandler_freeEntry(entry);
             free(entry);
@@ -1102,6 +1145,7 @@ void pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t 
*handle, psa_tcp_connec
         free(interface_url);
     }
     celixThreadRwlock_unlock(&handle->dbLock);
+    return fd;
 }
 
 //
@@ -1158,18 +1202,74 @@ void 
pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
             if (handle->receiverConnectMessageCallback)
                 
handle->receiverConnectMessageCallback(handle->receiverConnectPayload, 
entry->url, false);
             entry->connected = true;
+#if defined(__APPLE__)
+            rc = 0;
+#else
             struct epoll_event event;
             bzero(&event, sizeof(event)); // zero the struct
             event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
             event.data.fd = fd;
             // Register Modify epoll
             rc = epoll_ctl(handle->efd, EPOLL_CTL_MOD, fd, &event);
+#endif
             if (rc < 0)
-                L_ERROR("[TCP Socket] Cannot create epoll\n");
+                L_ERROR("[TCP Socket] Cannot create poll\n");
         }
     celixThreadRwlock_unlock(&handle->dbLock);
 }
 
+#if defined(__APPLE__)
+//
+// The main socket event loop
+//
+static inline
+void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
+  int rc = 0;
+  if (handle->efd >= 0) {
+    int nof_events = 0;
+    //  Wait for events.
+    struct kevent events[MAX_EVENTS];
+    struct timespec ts = {handle->timeout / 1000, (handle->timeout  % 1000) * 
1000000};
+    nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, 
handle->timeout ? &ts : NULL);
+    if (nof_events < 0) {
+      if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+      } else
+        L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, 
strerror(errno));
+    }
+    for (int i = 0; i < nof_events; i++) {
+      hash_map_iterator_t iter = 
hashMapIterator_construct(handle->interface_fd_map);
+      psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
+      while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (events[i].ident == entry->fd)
+          pendingConnectionEntry = entry;
+      }
+      if (pendingConnectionEntry) {
+        int fd = pubsub_tcpHandler_acceptHandler(handle, 
pendingConnectionEntry);
+        pubsub_tcpHandler_connectionHandler(handle, fd);
+      } else if (events[i].filter & EVFILT_READ) {
+        pubsub_tcpHandler_readHandler(handle, events[i].ident);
+      } else if (events[i].flags & EV_EOF) {
+        int err = 0;
+        socklen_t len = sizeof(int);
+        rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len);
+        if (rc != 0) {
+          L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", 
strerror(errno));
+          continue;
+        }
+        pubsub_tcpHandler_close(handle, events[i].ident);
+      } else if (events[i].flags & EV_ERROR) {
+        L_ERROR("[TCP Socket]:EPOLLERR  ERROR read from socket %s\n", 
strerror(errno));
+        pubsub_tcpHandler_close(handle, events[i].ident);
+        continue;
+      }
+    }
+  }
+  return;
+}
+
+#else
+
 //
 // The main socket event loop
 //
@@ -1177,9 +1277,9 @@ static inline
 void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
     int rc = 0;
     if (handle->efd >= 0) {
-        struct epoll_event events[MAX_EPOLL_EVENTS];
         int nof_events = 0;
-        nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, 
handle->timeout);
+        struct epoll_event events[MAX_EVENTS];
+        nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, 
handle->timeout);
         if (nof_events < 0) {
             if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
             } else
@@ -1217,6 +1317,7 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t 
*handle) {
     }
     return;
 }
+#endif
 
 //
 // The socket thread
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 1aa81d5..9fe9f33 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -23,7 +23,6 @@
 #include <pubsub/subscriber.h>
 #include <memory.h>
 #include <pubsub_constants.h>
-#include <sys/epoll.h>
 #include <assert.h>
 #include <pubsub_endpoint.h>
 #include <arpa/inet.h>

Reply via email to