This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit 2b2ef9b70419c53f6780bd719a773f3cc34054f0 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jul 3 20:07:51 2023 +0200 Revert scheduled event usage in remote endpoint poller, because network io is used --- .../include/endpoint_discovery_poller.h | 3 +- .../src/endpoint_discovery_poller.c | 67 ++++++++++++---------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h index 7b309ccf..9cf03291 100644 --- a/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h +++ b/bundles/remote_services/discovery_common/include/endpoint_discovery_poller.h @@ -41,11 +41,12 @@ struct endpoint_discovery_poller { celix_log_helper_t **loghelper; celix_thread_mutex_t pollerLock; + celix_thread_t pollerThread; unsigned int poll_interval; unsigned int poll_timeout; - long pollEventId; + volatile bool running; }; celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bundle_context_t *context, const char* defaultPollEndpoints, endpoint_discovery_poller_t **poller); diff --git a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c index 50fe245f..5589c846 100644 --- a/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c +++ b/bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c @@ -47,7 +47,7 @@ #define DISCOVERY_POLL_TIMEOUT "DISCOVERY_CFG_POLL_TIMEOUT" #define DEFAULT_POLL_TIMEOUT "10" // seconds -static void endpointDiscoveryPoller_performPeriodicPoll(void *data); +static void *endpointDiscoveryPoller_performPeriodicPoll(void *data); celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller, char *url, array_list_pt currentEndpoints); static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_t *poller, char *url, array_list_pt *updatedEndpoints); static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(const void *endpointPtr, const void *comparePtr, bool *equals); @@ -93,6 +93,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund (*poller)->poll_interval = atoi(interval); (*poller)->poll_timeout = atoi(timeout); (*poller)->discovery = discovery; + (*poller)->running = false; (*poller)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); const char* sep = ","; @@ -110,14 +111,9 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund return CELIX_BUNDLE_EXCEPTION; } - double intervalInSeconds = (double)(*poller)->poll_interval; - celix_scheduled_event_options_t schedOpts = CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS; - schedOpts.name = "Remote Services Endpoint Discovery Poller"; - schedOpts.callback = endpointDiscoveryPoller_performPeriodicPoll; - schedOpts.callbackData = *poller; - schedOpts.initialDelayInSeconds = intervalInSeconds; - schedOpts.intervalInSeconds = intervalInSeconds; - (*poller)->pollEventId = celix_bundleContext_scheduleEvent(context, &schedOpts); + (*poller)->running = true; + + status += celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_performPeriodicPoll, *poller); status += celixThreadMutex_unlock(&(*poller)->pollerLock); if(status != CELIX_SUCCESS){ @@ -133,7 +129,9 @@ celix_status_t endpointDiscoveryPoller_create(discovery_t *discovery, celix_bund celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_t *poller) { celix_status_t status; - celix_bundleContext_removeScheduledEvent(poller->discovery->context, poller->pollEventId); + poller->running = false; + + celixThread_join(poller->pollerThread, NULL); hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); while (hashMapIterator_hasNext(iterator)) { @@ -292,32 +290,43 @@ celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller, return status; } -static void endpointDiscoveryPoller_performPeriodicPoll(void *data) { - endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data; - celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); - if (status != CELIX_SUCCESS) { - celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); - } else { - hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); +static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) { + endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data; + + useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L); + + while (poller->running) { + usleep(interval); + celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); - while (hashMapIterator_hasNext(iterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + if (status != CELIX_SUCCESS) { + celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); + } else { + hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); - char *url = hashMapEntry_getKey(entry); - array_list_pt currentEndpoints = hashMapEntry_getValue(entry); + while (hashMapIterator_hasNext(iterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); - endpointDiscoveryPoller_poll(poller, url, currentEndpoints); - } + char *url = hashMapEntry_getKey(entry); + array_list_pt currentEndpoints = hashMapEntry_getValue(entry); - hashMapIterator_destroy(iterator); - } + endpointDiscoveryPoller_poll(poller, url, currentEndpoints); + } - status = celixThreadMutex_unlock(&poller->pollerLock); - if (status != CELIX_SUCCESS) { - celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to release lock; retrying..."); - } + hashMapIterator_destroy(iterator); + } + + status = celixThreadMutex_unlock(&poller->pollerLock); + if (status != CELIX_SUCCESS) { + celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed to release lock; retrying..."); + } + } + + return NULL; } + + struct MemoryStruct { char *memory; size_t size;
