Repository: celix Updated Branches: refs/heads/develop 109444652 -> fad8ca259
CELIX-257: refactored to allow endpoint poll when adding a new url Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/fad8ca25 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/fad8ca25 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/fad8ca25 Branch: refs/heads/develop Commit: fad8ca259b62ebaf7baa45103c8f418ab179f7d8 Parents: 1094446 Author: Bjoern Petri <[email protected]> Authored: Tue Sep 15 09:21:59 2015 +0200 Committer: Bjoern Petri <[email protected]> Committed: Tue Sep 15 09:21:59 2015 +0200 ---------------------------------------------------------------------- .../private/src/endpoint_discovery_poller.c | 114 ++++++++++--------- 1 file changed, 63 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/fad8ca25/remote_services/discovery/private/src/endpoint_discovery_poller.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery/private/src/endpoint_discovery_poller.c b/remote_services/discovery/private/src/endpoint_discovery_poller.c index ae269cf..ac819c5 100644 --- a/remote_services/discovery/private/src/endpoint_discovery_poller.c +++ b/remote_services/discovery/private/src/endpoint_discovery_poller.c @@ -43,7 +43,9 @@ #define DISCOVERY_POLL_INTERVAL "DISCOVERY_CFG_POLL_INTERVAL" #define DEFAULT_POLL_INTERVAL "10" -static void *endpointDiscoveryPoller_poll(void *data); + +static void *endpointDiscoveryPoller_performPeriodicPoll(void *data); +celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller, char *url, array_list_pt currentEndpoints); static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints); static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals); @@ -99,7 +101,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, bundle_con return CELIX_BUNDLE_EXCEPTION; } - status = celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_poll, *poller); + status = celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_performPeriodicPoll, *poller); if (status != CELIX_SUCCESS) { return status; } @@ -187,6 +189,7 @@ celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_p if (status == CELIX_SUCCESS) { logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_DEBUG, "ENDPOINT_POLLER: add new discovery endpoint with url %s", url); hashMap_put(poller->entries, strdup(url), endpoints); + endpointDiscoveryPoller_poll(poller, url, endpoints); } } @@ -234,80 +237,89 @@ celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discover return status; } -static void *endpointDiscoveryPoller_poll(void *data) { - endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data; - useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L); - while (poller->running) { - usleep(interval); - celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); - if (status != CELIX_SUCCESS) { - logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); - continue; - } +celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt poller, char *url, array_list_pt currentEndpoints) { + celix_status_t status = NULL; + array_list_pt updatedEndpoints = NULL; - hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); + // create an arraylist with a custom equality test to ensure we can find endpoints properly... + arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints); + status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints); - while (hashMapIterator_hasNext(iterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + if (status != CELIX_SUCCESS) { + status = celixThreadMutex_unlock(&poller->pollerLock); + } else { + if (updatedEndpoints) { + for (unsigned int i = arrayList_size(currentEndpoints); i > 0; i--) { + endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i - 1); + + if (!arrayList_contains(updatedEndpoints, endpoint)) { + status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint); + arrayList_remove(currentEndpoints, i - 1); + endpointDescription_destroy(endpoint); + } + } - char *url = hashMapEntry_getKey(entry); - array_list_pt currentEndpoints = hashMapEntry_getValue(entry); + for (int i = arrayList_size(updatedEndpoints); i > 0; i--) { + endpoint_description_pt endpoint = arrayList_remove(updatedEndpoints, 0); - array_list_pt updatedEndpoints = NULL; - // create an arraylist with a custom equality test to ensure we can find endpoints properly... - arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints); - status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints); + if (!arrayList_contains(currentEndpoints, endpoint)) { + arrayList_add(currentEndpoints, endpoint); + status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint); + } else { + endpointDescription_destroy(endpoint); - if (status != CELIX_SUCCESS) { - status = celixThreadMutex_unlock(&poller->pollerLock); - continue; + } } + } - if (updatedEndpoints) { - for (unsigned int i = arrayList_size(currentEndpoints); i > 0 ; i--) { - endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i-1); + if (updatedEndpoints) { + arrayList_destroy(updatedEndpoints); + } + } - if (!arrayList_contains(updatedEndpoints, endpoint)) { - status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint); - arrayList_remove(currentEndpoints, i-1); - endpointDescription_destroy(endpoint); - } - } + return status; +} - for (int i = arrayList_size(updatedEndpoints); i > 0 ; i--) { - endpoint_description_pt endpoint = arrayList_remove(updatedEndpoints, 0); +static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) { + endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data; - if (!arrayList_contains(currentEndpoints, endpoint)) { - arrayList_add(currentEndpoints, endpoint); - status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint); - } - else { - endpointDescription_destroy(endpoint); + useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L); - } - } - } + while (poller->running) { + usleep(interval); + celix_status_t status = celixThreadMutex_lock(&poller->pollerLock); + + if (status != CELIX_SUCCESS) { + logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying..."); + } else { + hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries); + + while (hashMapIterator_hasNext(iterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + + char *url = hashMapEntry_getKey(entry); + array_list_pt currentEndpoints = hashMapEntry_getValue(entry); - if (updatedEndpoints) { - arrayList_destroy(updatedEndpoints); + endpointDiscoveryPoller_poll(poller, url, currentEndpoints); } + hashMapIterator_destroy(iterator); } - hashMapIterator_destroy(iterator); - status = celixThreadMutex_unlock(&poller->pollerLock); if (status != CELIX_SUCCESS) { logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to release lock; retrying..."); } - } + } - return NULL; + return NULL; } + + struct MemoryStruct { char *memory; size_t size; @@ -366,7 +378,7 @@ static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_po endpointDescriptorReader_destroy(reader); } } else { - logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, "ENDPOINT_POLLER: unable to read endpoints, reason: %s", curl_easy_strerror(res)); + logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, "ENDPOINT_POLLER: unable to read endpoints from %s, reason: %s", url, curl_easy_strerror(res)); } // clean up endpoints file
