This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 2b2ef9b70419c53f6780bd719a773f3cc34054f0
Author: Pepijn Noltes <[email protected]>
AuthorDate: Mon Jul 3 20:07:51 2023 +0200

    Revert scheduled event usage in remote endpoint poller, because network io 
is used
---
 .../include/endpoint_discovery_poller.h            |  3 +-
 .../src/endpoint_discovery_poller.c                | 67 ++++++++++++----------
 2 files changed, 40 insertions(+), 30 deletions(-)

diff --git 
a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h 
b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
index 7b309ccf..9cf03291 100644
--- 
a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
+++ 
b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
@@ -41,11 +41,12 @@ struct endpoint_discovery_poller {
     celix_log_helper_t **loghelper;
 
     celix_thread_mutex_t pollerLock;
+    celix_thread_t pollerThread;
 
     unsigned int poll_interval;
     unsigned int poll_timeout;
 
-    long pollEventId;
+    volatile bool running;
 };
 
 celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, 
celix_bundle_context_t *context, const char* defaultPollEndpoints, 
endpoint_discovery_poller_t **poller);
diff --git 
a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c 
b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
index 50fe245f..5589c846 100644
--- a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
+++ b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
@@ -47,7 +47,7 @@
 #define DISCOVERY_POLL_TIMEOUT "DISCOVERY_CFG_POLL_TIMEOUT"
 #define DEFAULT_POLL_TIMEOUT "10" // seconds
 
-static void endpointDiscoveryPoller_performPeriodicPoll(void *data);
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data);
 celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t 
*poller, char *url, array_list_pt currentEndpoints);
 static celix_status_t 
endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_t *poller, char 
*url, array_list_pt *updatedEndpoints);
 static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(const 
void *endpointPtr, const void *comparePtr, bool *equals);
@@ -93,6 +93,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
        (*poller)->poll_interval = atoi(interval);
        (*poller)->poll_timeout = atoi(timeout);
        (*poller)->discovery = discovery;
+       (*poller)->running = false;
        (*poller)->entries = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
        const char* sep = ",";
@@ -110,14 +111,9 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
                return CELIX_BUNDLE_EXCEPTION;
        }
 
-    double intervalInSeconds = (double)(*poller)->poll_interval;
-    celix_scheduled_event_options_t schedOpts = 
CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS;
-    schedOpts.name = "Remote Services Endpoint Discovery Poller";
-    schedOpts.callback = endpointDiscoveryPoller_performPeriodicPoll;
-    schedOpts.callbackData = *poller;
-    schedOpts.initialDelayInSeconds = intervalInSeconds;
-    schedOpts.intervalInSeconds = intervalInSeconds;
-    (*poller)->pollEventId = celix_bundleContext_scheduleEvent(context, 
&schedOpts);
+       (*poller)->running = true;
+
+       status += celixThread_create(&(*poller)->pollerThread, NULL, 
endpointDiscoveryPoller_performPeriodicPoll, *poller);
        status += celixThreadMutex_unlock(&(*poller)->pollerLock);
 
        if(status != CELIX_SUCCESS){
@@ -133,7 +129,9 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
 celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_t 
*poller) {
        celix_status_t status;
 
-    celix_bundleContext_removeScheduledEvent(poller->discovery->context, 
poller->pollEventId);
+       poller->running = false;
+
+       celixThread_join(poller->pollerThread, NULL);
 
        hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
        while (hashMapIterator_hasNext(iterator)) {
@@ -292,32 +290,43 @@ celix_status_t 
endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller,
        return status;
 }
 
-static void endpointDiscoveryPoller_performPeriodicPoll(void *data) {
-    endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data;
-    celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
-    if (status != CELIX_SUCCESS) {
-        celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed 
to obtain lock; retrying...");
-    } else {
-        hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) {
+       endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) 
data;
+
+       useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
+
+       while (poller->running) {
+               usleep(interval);
+               celix_status_t status = 
celixThreadMutex_lock(&poller->pollerLock);
 
-        while (hashMapIterator_hasNext(iterator)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
+               if (status != CELIX_SUCCESS) {
+            celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: 
failed to obtain lock; retrying...");
+               } else {
+                       hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
 
-            char *url = hashMapEntry_getKey(entry);
-            array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
+                       while (hashMapIterator_hasNext(iterator)) {
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
 
-            endpointDiscoveryPoller_poll(poller, url, currentEndpoints);
-        }
+                               char *url = hashMapEntry_getKey(entry);
+                               array_list_pt currentEndpoints = 
hashMapEntry_getValue(entry);
 
-        hashMapIterator_destroy(iterator);
-    }
+                               endpointDiscoveryPoller_poll(poller, url, 
currentEndpoints);
+                       }
 
-    status = celixThreadMutex_unlock(&poller->pollerLock);
-    if (status != CELIX_SUCCESS) {
-        celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed 
to release lock; retrying...");
-    }
+                       hashMapIterator_destroy(iterator);
+               }
+
+               status = celixThreadMutex_unlock(&poller->pollerLock);
+               if (status != CELIX_SUCCESS) {
+            celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: 
failed to release lock; retrying...");
+               }
+       }
+
+       return NULL;
 }
 
+
+
 struct MemoryStruct {
        char *memory;
        size_t size;

Reply via email to