Repository: celix
Updated Branches:
  refs/heads/develop 109444652 -> fad8ca259


CELIX-257: refactored to allow endpoint poll when adding a new url


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/fad8ca25
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/fad8ca25
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/fad8ca25

Branch: refs/heads/develop
Commit: fad8ca259b62ebaf7baa45103c8f418ab179f7d8
Parents: 1094446
Author: Bjoern Petri <[email protected]>
Authored: Tue Sep 15 09:21:59 2015 +0200
Committer: Bjoern Petri <[email protected]>
Committed: Tue Sep 15 09:21:59 2015 +0200

----------------------------------------------------------------------
 .../private/src/endpoint_discovery_poller.c     | 114 ++++++++++---------
 1 file changed, 63 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/fad8ca25/remote_services/discovery/private/src/endpoint_discovery_poller.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery/private/src/endpoint_discovery_poller.c 
b/remote_services/discovery/private/src/endpoint_discovery_poller.c
index ae269cf..ac819c5 100644
--- a/remote_services/discovery/private/src/endpoint_discovery_poller.c
+++ b/remote_services/discovery/private/src/endpoint_discovery_poller.c
@@ -43,7 +43,9 @@
 #define DISCOVERY_POLL_INTERVAL "DISCOVERY_CFG_POLL_INTERVAL"
 #define DEFAULT_POLL_INTERVAL "10"
 
-static void *endpointDiscoveryPoller_poll(void *data);
+
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data);
+celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt 
poller, char *url, array_list_pt currentEndpoints);
 static celix_status_t 
endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char 
*url, array_list_pt *updatedEndpoints);
 static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void 
*endpointPtr, void *comparePtr, bool *equals);
 
@@ -99,7 +101,7 @@ celix_status_t endpointDiscoveryPoller_create(discovery_pt 
discovery, bundle_con
         return CELIX_BUNDLE_EXCEPTION;
     }
 
-       status = celixThread_create(&(*poller)->pollerThread, NULL, 
endpointDiscoveryPoller_poll, *poller);
+       status = celixThread_create(&(*poller)->pollerThread, NULL, 
endpointDiscoveryPoller_performPeriodicPoll, *poller);
        if (status != CELIX_SUCCESS) {
                return status;
        }
@@ -187,6 +189,7 @@ celix_status_t 
endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_p
                if (status == CELIX_SUCCESS) {
                        logHelper_log(*poller->loghelper, 
OSGI_LOGSERVICE_DEBUG, "ENDPOINT_POLLER: add new discovery endpoint with url 
%s", url);
                        hashMap_put(poller->entries, strdup(url), endpoints);
+                       endpointDiscoveryPoller_poll(poller, url, endpoints);
                }
        }
 
@@ -234,80 +237,89 @@ celix_status_t 
endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discover
        return status;
 }
 
-static void *endpointDiscoveryPoller_poll(void *data) {
-    endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data;
 
-    useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
 
-    while (poller->running) {
-       usleep(interval);
 
-        celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
-        if (status != CELIX_SUCCESS) {
-               logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_WARNING, 
"ENDPOINT_POLLER: failed to obtain lock; retrying...");
-               continue;
-        }
+celix_status_t endpointDiscoveryPoller_poll(endpoint_discovery_poller_pt 
poller, char *url, array_list_pt currentEndpoints) {
+       celix_status_t status = NULL;
+       array_list_pt updatedEndpoints = NULL;
 
-               hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
+       // create an arraylist with a custom equality test to ensure we can 
find endpoints properly...
+       
arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, 
&updatedEndpoints);
+       status = endpointDiscoveryPoller_getEndpoints(poller, url, 
&updatedEndpoints);
 
-               while (hashMapIterator_hasNext(iterator)) {
-                       hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
+       if (status != CELIX_SUCCESS) {
+               status = celixThreadMutex_unlock(&poller->pollerLock);
+       } else {
+               if (updatedEndpoints) {
+                       for (unsigned int i = arrayList_size(currentEndpoints); 
i > 0; i--) {
+                               endpoint_description_pt endpoint = 
arrayList_get(currentEndpoints, i - 1);
+
+                               if (!arrayList_contains(updatedEndpoints, 
endpoint)) {
+                                       status = 
discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+                                       arrayList_remove(currentEndpoints, i - 
1);
+                                       endpointDescription_destroy(endpoint);
+                               }
+                       }
 
-                       char *url = hashMapEntry_getKey(entry);
-                       array_list_pt currentEndpoints = 
hashMapEntry_getValue(entry);
+                       for (int i = arrayList_size(updatedEndpoints); i > 0; 
i--) {
+                               endpoint_description_pt endpoint = 
arrayList_remove(updatedEndpoints, 0);
 
-                       array_list_pt updatedEndpoints = NULL;
-                       // create an arraylist with a custom equality test to 
ensure we can find endpoints properly...
-                       
arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, 
&updatedEndpoints);
-                       status = endpointDiscoveryPoller_getEndpoints(poller, 
url, &updatedEndpoints);
+                               if (!arrayList_contains(currentEndpoints, 
endpoint)) {
+                                       arrayList_add(currentEndpoints, 
endpoint);
+                                       status = 
discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
+                               } else {
+                                       endpointDescription_destroy(endpoint);
 
-                       if (status != CELIX_SUCCESS) {
-                               status = 
celixThreadMutex_unlock(&poller->pollerLock);
-                               continue;
+                               }
                        }
+               }
 
-                       if (updatedEndpoints) {
-                               for (unsigned int i = 
arrayList_size(currentEndpoints); i > 0  ; i--) {
-                                       endpoint_description_pt endpoint = 
arrayList_get(currentEndpoints, i-1);
+               if (updatedEndpoints) {
+                       arrayList_destroy(updatedEndpoints);
+               }
+       }
 
-                                       if 
(!arrayList_contains(updatedEndpoints, endpoint)) {
-                                               status = 
discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
-                                               
arrayList_remove(currentEndpoints, i-1);
-                                               
endpointDescription_destroy(endpoint);
-                                       }
-                               }
+       return status;
+}
 
-                               for (int i = arrayList_size(updatedEndpoints); 
i > 0  ; i--) {
-                                       endpoint_description_pt endpoint = 
arrayList_remove(updatedEndpoints, 0);
+static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) {
+       endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) 
data;
 
-                                       if 
(!arrayList_contains(currentEndpoints, endpoint)) {
-                                               arrayList_add(currentEndpoints, 
endpoint);
-                                               status = 
discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
-                                       }
-                                       else {
-                                               
endpointDescription_destroy(endpoint);
+       useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
 
-                                       }
-                               }
-                       }
+       while (poller->running) {
+               usleep(interval);
+               celix_status_t status = 
celixThreadMutex_lock(&poller->pollerLock);
+
+               if (status != CELIX_SUCCESS) {
+                       logHelper_log(*poller->loghelper, 
OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying...");
+               } else {
+                       hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
+
+                       while (hashMapIterator_hasNext(iterator)) {
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
+
+                               char *url = hashMapEntry_getKey(entry);
+                               array_list_pt currentEndpoints = 
hashMapEntry_getValue(entry);
 
-                       if (updatedEndpoints) {
-                               arrayList_destroy(updatedEndpoints);
+                               endpointDiscoveryPoller_poll(poller, url, 
currentEndpoints);
                        }
 
+                       hashMapIterator_destroy(iterator);
                }
 
-               hashMapIterator_destroy(iterator);
-
                status = celixThreadMutex_unlock(&poller->pollerLock);
                if (status != CELIX_SUCCESS) {
                        logHelper_log(*poller->loghelper, 
OSGI_LOGSERVICE_WARNING, "ENDPOINT_POLLER: failed to release lock; 
retrying...");
                }
-    }
+       }
 
-    return NULL;
+       return NULL;
 }
 
+
+
 struct MemoryStruct {
   char *memory;
   size_t size;
@@ -366,7 +378,7 @@ static celix_status_t 
endpointDiscoveryPoller_getEndpoints(endpoint_discovery_po
                        endpointDescriptorReader_destroy(reader);
        }
     } else {
-       logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, 
"ENDPOINT_POLLER: unable to read endpoints, reason: %s", 
curl_easy_strerror(res));
+       logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_ERROR, 
"ENDPOINT_POLLER: unable to read endpoints from %s, reason: %s", url, 
curl_easy_strerror(res));
     }
 
     // clean up endpoints file

Reply via email to