This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 309ed9cef4aeb31e2a4e9e77f8af5e1fdc4ef809
Author: Pepijn Noltes <[email protected]>
AuthorDate: Thu Jun 15 20:01:05 2023 +0200

    Refactor remote services discovery poller for scheduled event usage
---
 .../include/endpoint_discovery_poller.h            |  3 +-
 .../src/endpoint_discovery_poller.c                | 66 +++++++++-------------
 2 files changed, 29 insertions(+), 40 deletions(-)

diff --git 
a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h 
b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
index 9cf03291..7b309ccf 100644
--- 
a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
+++ 
b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h
@@ -41,12 +41,11 @@ struct endpoint_discovery_poller {
     celix_log_helper_t **loghelper;
 
     celix_thread_mutex_t pollerLock;
-    celix_thread_t pollerThread;
 
     unsigned int poll_interval;
     unsigned int poll_timeout;
 
-    volatile bool running;
+    long pollEventId;
 };
 
 celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, 
celix_bundle_context_t *context, const char* defaultPollEndpoints, 
endpoint_discovery_poller_t **poller);
diff --git 
a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c 
b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
index 5589c846..d8e7e319 100644
--- a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
+++ b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c
@@ -47,7 +47,7 @@
 #define DISCOVERY_POLL_TIMEOUT "DISCOVERY_CFG_POLL_TIMEOUT"
 #define DEFAULT_POLL_TIMEOUT "10" // seconds
 
-static void *endpointDiscoveryPoller_performPeriodicPoll(void *data);
+static void endpointDiscoveryPoller_performPeriodicPoll(void *data);
 celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t 
*poller, char *url, array_list_pt currentEndpoints);
 static celix_status_t 
endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_t *poller, char 
*url, array_list_pt *updatedEndpoints);
 static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(const 
void *endpointPtr, const void *comparePtr, bool *equals);
@@ -93,7 +93,6 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
        (*poller)->poll_interval = atoi(interval);
        (*poller)->poll_timeout = atoi(timeout);
        (*poller)->discovery = discovery;
-       (*poller)->running = false;
        (*poller)->entries = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
        const char* sep = ",";
@@ -111,9 +110,13 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
                return CELIX_BUNDLE_EXCEPTION;
        }
 
-       (*poller)->running = true;
-
-       status += celixThread_create(&(*poller)->pollerThread, NULL, 
endpointDiscoveryPoller_performPeriodicPoll, *poller);
+    double intervalInSeconds = (double)(*poller)->poll_interval;
+    celix_scheduled_event_options_t opts = CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS;
+    opts.callback = endpointDiscoveryPoller_performPeriodicPoll;
+    opts.callbackData = *poller;
+    opts.initialDelayInSeconds = intervalInSeconds;
+    opts.intervalInSeconds = intervalInSeconds;
+    (*poller)->pollEventId = celix_bundleContext_scheduleEvent(context, &opts);
        status += celixThreadMutex_unlock(&(*poller)->pollerLock);
 
        if(status != CELIX_SUCCESS){
@@ -129,9 +132,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t 
*discovery, celix_bund
 celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_t 
*poller) {
        celix_status_t status;
 
-       poller->running = false;
-
-       celixThread_join(poller->pollerThread, NULL);
+    celix_bundleContext_removeScheduledEvent(poller->discovery->context, 
poller->pollEventId);
 
        hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
        while (hashMapIterator_hasNext(iterator)) {
@@ -290,43 +291,32 @@ celix_status_t 
endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller,
        return status;
 }
 
-static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) {
-       endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) 
data;
-
-       useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
-
-       while (poller->running) {
-               usleep(interval);
-               celix_status_t status = 
celixThreadMutex_lock(&poller->pollerLock);
+static void endpointDiscoveryPoller_performPeriodicPoll(void *data) {
+    endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data;
+    celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed 
to obtain lock; retrying...");
+    } else {
+        hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
 
-               if (status != CELIX_SUCCESS) {
-            celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: 
failed to obtain lock; retrying...");
-               } else {
-                       hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
+        while (hashMapIterator_hasNext(iterator)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
 
-                       while (hashMapIterator_hasNext(iterator)) {
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
+            char *url = hashMapEntry_getKey(entry);
+            array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
 
-                               char *url = hashMapEntry_getKey(entry);
-                               array_list_pt currentEndpoints = 
hashMapEntry_getValue(entry);
+            endpointDiscoveryPoller_poll(poller, url, currentEndpoints);
+        }
 
-                               endpointDiscoveryPoller_poll(poller, url, 
currentEndpoints);
-                       }
+        hashMapIterator_destroy(iterator);
+    }
 
-                       hashMapIterator_destroy(iterator);
-               }
-
-               status = celixThreadMutex_unlock(&poller->pollerLock);
-               if (status != CELIX_SUCCESS) {
-            celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: 
failed to release lock; retrying...");
-               }
-       }
-
-       return NULL;
+    status = celixThreadMutex_unlock(&poller->pollerLock);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed 
to release lock; retrying...");
+    }
 }
 
-
-
 struct MemoryStruct {
        char *memory;
        size_t size;

Reply via email to