This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit 309ed9cef4aeb31e2a4e9e77f8af5e1fdc4ef809 Author: Pepijn Noltes <[email protected]> AuthorDate: Thu Jun 15 20:01:05 2023 +0200 Refactor remote services discovery poller for scheduled event usage --- .../include/endpoint_discovery_poller.h | 3 +- .../src/endpoint_discovery_poller.c | 66 +++++++++------------- 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h index 9cf03291..7b309ccf 100644 --- a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h +++ b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h @@ -41,12 +41,11 @@ struct endpoint_discovery_poller { celix_log_helper_t **loghelper; celix_thread_mutex_t pollerLock; - celix_thread_t pollerThread; unsigned int poll_interval; unsigned int poll_timeout; - volatile bool running; + long pollEventId; }; celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bundle_context_t *context, const char* defaultPollEndpoints, endpoint_discovery_poller_t **poller); diff --git a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c index 5589c846..d8e7e319 100644 --- a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c +++ b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c @@ -47,7 +47,7 @@ #define DISCOVERY_POLL_TIMEOUT "DISCOVERY_CFG_POLL_TIMEOUT" #define DEFAULT_POLL_TIMEOUT "10" // seconds -static void *endpointDiscoveryPoller_performPeriodicPoll(void *data); +static void endpointDiscoveryPoller_performPeriodicPoll(void *data); celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller, char *url, array_list_pt currentEndpoints); static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_t *poller, char *url, array_list_pt *updatedEndpoints); static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(const void *endpointPtr, const void *comparePtr, bool *equals); @@ -93,7 +93,6 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund (*poller)->poll_interval = atoi(interval); (*poller)->poll_timeout = atoi(timeout); (*poller)->discovery = discovery; - (*poller)->running = false; (*poller)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); const char* sep = ","; @@ -111,9 +110,13 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund return CELIX_BUNDLE_EXCEPTION; } - (*poller)->running = true; - - status += celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_performPeriodicPoll, *poller); + double intervalInSeconds = (double)(*poller)->poll_interval; + celix_scheduled_event_options_t opts = CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS; + opts.callback = endpointDiscoveryPoller_performPeriodicPoll; + opts.callbackData = *poller; + opts.initialDelayInSeconds = intervalInSeconds; + opts.intervalInSeconds = intervalInSeconds; + (*poller)->pollEventId = celix_bundleContext_scheduleEvent(context, &opts); status += celixThreadMutex_unlock(&(*poller)->pollerLock); if(status != CELIX_SUCCESS){ @@ -129,9 +132,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_t *poller) { celix_status_t status; - poller->running = false; - - celixThread_join(poller->pollerThread, NULL); + celix_bundleContext_removeScheduledEvent(poller->discovery->context, poller->pollEventId); hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); while (hashMapIterator_hasNext(iterator)) { @@ -290,43 +291,32 @@ celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller, return status; } -static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) { - endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data; - - useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L); - - while (poller->running) { - usleep(interval); - celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); +static void endpointDiscoveryPoller_performPeriodicPoll(void *data) { + endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data; + celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); + if (status != CELIX_SUCCESS) { + celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); + } else { + hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); - if (status != CELIX_SUCCESS) { - celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); - } else { - hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); + while (hashMapIterator_hasNext(iterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); - while (hashMapIterator_hasNext(iterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + char *url = hashMapEntry_getKey(entry); + array_list_pt currentEndpoints = hashMapEntry_getValue(entry); - char *url = hashMapEntry_getKey(entry); - array_list_pt currentEndpoints = hashMapEntry_getValue(entry); + endpointDiscoveryPoller_poll(poller, url, currentEndpoints); + } - endpointDiscoveryPoller_poll(poller, url, currentEndpoints); - } + hashMapIterator_destroy(iterator); + } - hashMapIterator_destroy(iterator); - } - - status = celixThreadMutex_unlock(&poller->pollerLock); - if (status != CELIX_SUCCESS) { - celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to release lock; retrying..."); - } - } - - return NULL; + status = celixThreadMutex_unlock(&poller->pollerLock); + if (status != CELIX_SUCCESS) { + celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to release lock; retrying..."); + } } - - struct MemoryStruct { char *memory; size_t size;
