Repository: celix
Updated Branches:
  refs/heads/develop f32a23359 -> c396aeed1


CELIX-252: added separate hashmap to double-check whether expired key belongs 
to imported services


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c396aeed
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c396aeed
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c396aeed

Branch: refs/heads/develop
Commit: c396aeed17b7618d5b2109a7a83f0c4213ce70f3
Parents: f32a233
Author: Bjoern Petri <[email protected]>
Authored: Sun Aug 23 21:42:50 2015 +0200
Committer: Bjoern Petri <[email protected]>
Committed: Sun Aug 23 21:42:50 2015 +0200

----------------------------------------------------------------------
 .../discovery_etcd/private/include/etcd.h       |   2 +-
 .../discovery_etcd/private/src/etcd.c           | 135 ++++++++++---------
 .../discovery_etcd/private/src/etcd_watcher.c   |  69 ++++++++--
 3 files changed, 136 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h 
b/remote_services/discovery_etcd/private/include/etcd.h
index e36fccb..f5624d0 100644
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ b/remote_services/discovery_etcd/private/include/etcd.h
@@ -54,6 +54,6 @@ bool etcd_get(char* key, char* value, char*action, int* 
modifiedIndex);
 bool etcd_getNodes(char* directory, char** nodeNames, int* size);
 bool etcd_set(char* key, char* value, int ttl, bool prevExist);
 bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value);
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value, char* rkey, int *modifiedIndex);
 
 #endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c 
b/remote_services/discovery_etcd/private/src/etcd.c
index 2c74856..d38f6bd 100644
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ b/remote_services/discovery_etcd/private/src/etcd.c
@@ -304,66 +304,79 @@ bool etcd_del(char* key) {
 }
 
 ///watch
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value) {
-       json_error_t error;
-       json_t* js_root = NULL;
-       json_t* js_node = NULL;
-       json_t* js_prevNode = NULL;
-       json_t* js_action = NULL;
-       json_t* js_value = NULL;
-       json_t* js_prevValue = NULL;
-       bool retVal = false;
-       char url[MAX_URL_LENGTH];
-       int res;
-       struct MemoryStruct reply;
-
-       reply.memory = malloc(1); /* will be grown as needed by the realloc 
above */
-       reply.size = 0; /* no data at this point */
 
-       if (index != 0)
-               snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d";, etcd_server, 
etcd_port, key,
-                               index);
-       else
-               snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true";, etcd_server, etcd_port, 
key);
-
-       res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) 
&reply);
-
-       if (res == CURLE_OK) {
-               js_root = json_loads(reply.memory, 0, &error);
-
-               if (js_root != NULL) {
-                       js_action = json_object_get(js_root, ETCD_JSON_ACTION);
-                       js_node = json_object_get(js_root, ETCD_JSON_NODE);
-                       js_prevNode = json_object_get(js_root, 
ETCD_JSON_PREVNODE);
-               }
-               if (js_prevNode != NULL) {
-                       js_prevValue = json_object_get(js_prevNode, 
ETCD_JSON_VALUE);
-               }
-               if (js_node != NULL) {
-                       js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-               }
-               if (js_prevNode != NULL) {
-                       js_prevValue = json_object_get(js_prevNode, 
ETCD_JSON_VALUE);
-               }
-               if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-                       strncpy(prevValue, json_string_value(js_prevValue), 
MAX_VALUE_LENGTH);
-               }
-               if ((js_value != NULL) && (json_is_string(js_value))) {
-                       strncpy(value, json_string_value(js_value), 
MAX_VALUE_LENGTH);
-               }
-               if ((js_action != NULL) && (json_is_string(js_action))) {
-                       strncpy(action, json_string_value(js_action), 
MAX_ACTION_LENGTH);
-
-                       retVal = true;
-               }
-               if (js_root != NULL) {
-                       json_decref(js_root);
-               }
-       }
-
-       if (reply.memory) {
-               free(reply.memory);
-       }
-
-       return retVal;
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value, char* rkey, int* modifiedIndex) {
+    json_error_t error;
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    json_t* js_prevNode = NULL;
+    json_t* js_action = NULL;
+    json_t* js_value = NULL;
+    json_t* js_rkey = NULL;
+    json_t* js_prevValue = NULL;
+    json_t* js_modIndex = NULL;
+    bool retVal = false;
+    char url[MAX_URL_LENGTH];
+    int res;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above 
*/
+    reply.size = 0; /* no data at this point */
+
+    if (index != 0)
+        snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d";, etcd_server, 
etcd_port, key, index);
+    else
+        snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true";, etcd_server, etcd_port, 
key);
+
+    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+
+    if (res == CURLE_OK) {
+
+        js_root = json_loads(reply.memory, 0, &error);
+
+        if (js_root != NULL) {
+            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if (js_node != NULL) {
+            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
+            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
+            strncpy(prevValue, json_string_value(js_prevValue), 
MAX_VALUE_LENGTH);
+        }
+        if ((js_value != NULL) && (json_is_string(js_value))) {
+            strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+        }
+        if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+            *modifiedIndex = json_integer_value(js_modIndex);
+        } else {
+            *modifiedIndex = index;
+        }
+
+        if ((js_rkey != NULL) && (js_action != NULL) && 
(json_is_string(js_rkey)) && (json_is_string(js_action))) {
+            strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
+            strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
+
+            retVal = true;
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c 
b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index eefd28f..89be84e 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -31,6 +31,7 @@
 #include "log_helper.h"
 #include "log_service.h"
 #include "constants.h"
+#include "utils.h"
 #include "discovery.h"
 #include "discovery_impl.h"
 
@@ -42,6 +43,7 @@
 struct etcd_watcher {
     discovery_pt discovery;
     log_helper_pt* loghelper;
+    hash_map_pt entries;
 
        celix_thread_mutex_t watcherLock;
        celix_thread_t watcherThread;
@@ -200,6 +202,52 @@ static celix_status_t 
etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
     return status;
 }
 
+
+
+
+static celix_status_t etcdWatcher_addEntry(etcd_watcher_pt watcher, char* key, 
char* value) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+       endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+       if (!hashMap_containsKey(watcher->entries, key)) {
+               status = endpointDiscoveryPoller_addDiscoveryEndpoint(poller, 
value);
+
+               if (status == CELIX_SUCCESS) {
+                       hashMap_put(watcher->entries, key, value);
+               }
+       }
+
+       return status;
+}
+
+
+static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* 
key, char* value) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+       endpoint_discovery_poller_pt poller = watcher->discovery->poller;
+
+       if (hashMap_containsKey(watcher->entries, key)) {
+
+               hashMap_remove(watcher->entries, key);
+
+               // check if there is another entry with the same value
+               hash_map_iterator_pt iter = 
hashMapIterator_create(watcher->entries);
+               unsigned int valueFound = 0;
+
+               while (hashMapIterator_hasNext(iter) && valueFound <= 1) {
+                       if (strcmp(value, hashMapIterator_nextValue(iter)) == 0)
+                               valueFound++;
+               }
+
+               if (valueFound == 0)
+                       status = 
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, value);
+
+       }
+
+       return status;
+
+}
+
+
 /*
  * performs (blocking) etcd_watch calls to check for
  * changing discovery endpoint information within etcd.
@@ -211,29 +259,32 @@ static void* etcdWatcher_run(void* data) {
        int highestModified = 0;
 
        bundle_context_pt context = watcher->discovery->context;
-       endpoint_discovery_poller_pt poller = watcher->discovery->poller;
 
        etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, 
&highestModified);
        etcdWatcher_getRootPath(context, &rootPath[0]);
 
        while (watcher->running) {
+
+        char rkey[MAX_KEY_LENGTH];
                char value[MAX_VALUE_LENGTH];
                char preValue[MAX_VALUE_LENGTH];
                char action[MAX_ACTION_LENGTH];
+        int modIndex;
 
-               if (etcd_watch(rootPath, highestModified+1, &action[0], 
&preValue[0], &value[0]) == true) {
+               if (etcd_watch(rootPath, highestModified + 1, &action[0], 
&preValue[0], &value[0], &rkey[0], &modIndex) == true) {
                        if (strcmp(action, "set") == 0) {
-                               
endpointDiscoveryPoller_addDiscoveryEndpoint(poller, strdup(&value[0]));
+                               etcdWatcher_addEntry(watcher, &rkey[0], 
&value[0]);
                        } else if (strcmp(action, "delete") == 0) {
-                               
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+                               etcdWatcher_removeEntry(watcher, &rkey[0], 
&value[0]);
                        } else if (strcmp(action, "expire") == 0) {
-                               
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+                               etcdWatcher_removeEntry(watcher, &rkey[0], 
&value[0]);
                        } else if (strcmp(action, "update") == 0) {
-                               // TODO
+                               etcdWatcher_addEntry(watcher, &rkey[0], 
&value[0]);
                        } else {
                                logHelper_log(*watcher->loghelper, 
OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
                        }
-                       highestModified++;
+
+                       highestModified = modIndex;
                }
 
                // update own framework uuid
@@ -263,7 +314,6 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, 
bundle_context_pt cont
                return CELIX_BUNDLE_EXCEPTION;
        }
 
-
        (*watcher) = calloc(1, sizeof(struct etcd_watcher));
        if (!*watcher) {
                return CELIX_ENOMEM;
@@ -272,6 +322,7 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, 
bundle_context_pt cont
        {
                (*watcher)->discovery = discovery;
                (*watcher)->loghelper = &discovery->loghelper;
+               (*watcher)->entries = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
        }
 
        if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, 
&etcd_server) != CELIX_SUCCESS) || !etcd_server) {
@@ -338,6 +389,8 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) 
{
 
        watcher->loghelper = NULL;
 
+       hashMap_destroy(watcher->entries, true, true);
+
        free(watcher);
 
        return status;

Reply via email to