Repository: celix
Updated Branches:
  refs/heads/develop 4e665476c -> d62731a3c


CELIX-395: discovery_etcd using etcdlib now


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/d62731a3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/d62731a3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/d62731a3

Branch: refs/heads/develop
Commit: d62731a3cfe482d6796fea3fbd386091756f3b2a
Parents: 4e66547
Author: Roy Lenferink <lenferink...@gmail.com>
Authored: Fri Jan 27 10:27:48 2017 +0100
Committer: Roy Lenferink <lenferink...@gmail.com>
Committed: Fri Jan 27 11:35:28 2017 +0100

----------------------------------------------------------------------
 remote_services/discovery_etcd/CMakeLists.txt   |   4 +-
 .../private/include/discovery_impl.h            |   4 +-
 .../discovery_etcd/private/include/etcd.h       |  61 ---
 .../discovery_etcd/private/src/etcd.c           | 397 -------------------
 .../discovery_etcd/private/src/etcd_watcher.c   | 168 ++++----
 remote_services/examples/CMakeLists.txt         |   4 +-
 6 files changed, 83 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/CMakeLists.txt 
b/remote_services/discovery_etcd/CMakeLists.txt
index 31ba269..442d486 100644
--- a/remote_services/discovery_etcd/CMakeLists.txt
+++ b/remote_services/discovery_etcd/CMakeLists.txt
@@ -24,6 +24,7 @@ if (RSA_DISCOVERY_ETCD)
        include_directories("${CURL_INCLUDE_DIR}")
        include_directories("${JANSSON_INCLUDE_DIR}")
        include_directories("${LIBXML2_INCLUDE_DIR}")
+       include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
        include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
        
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
        
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
@@ -40,7 +41,6 @@ if (RSA_DISCOVERY_ETCD)
         NAME "Apache Celix RSA Discovery ETCD"
         SOURCES
                private/src/discovery_impl.c
-               private/src/etcd.c
            private/src/etcd_watcher.c
                
${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
                
${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
@@ -56,6 +56,6 @@ if (RSA_DISCOVERY_ETCD)
 
        install_bundle(discovery_etcd)
                
-       target_link_libraries(discovery_etcd celix_framework ${CURL_LIBRARIES} 
${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES})
+       target_link_libraries(discovery_etcd celix_framework etcdlib 
${CURL_LIBRARIES} ${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES})
 
 endif (RSA_DISCOVERY_ETCD)

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/discovery_impl.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/discovery_impl.h 
b/remote_services/discovery_etcd/private/include/discovery_impl.h
index e7e1071..a19b145 100644
--- a/remote_services/discovery_etcd/private/include/discovery_impl.h
+++ b/remote_services/discovery_etcd/private/include/discovery_impl.h
@@ -45,9 +45,7 @@
 
 #define DEFAULT_POLL_ENDPOINTS ""
 
-#define MAX_ROOTNODE_LENGTH             64
-#define MAX_LOCALNODE_LENGTH   256
-
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
 
 struct discovery {
        bundle_context_pt context;

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h 
b/remote_services/discovery_etcd/private/include/etcd.h
deleted file mode 100644
index 2ba09de..0000000
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0 
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.h
- *
- *  \date       26 Jul 2014
- *  \author     <a href="mailto:d...@celix.apache.org";>Apache Celix Project 
Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-
-#ifndef ETCD_H_
-#define ETCD_H_
-
-#include <stdbool.h>
-#include <celix_errno.h>
-
-#define MAX_NODES                      256
-
-#define MAX_KEY_LENGTH         256
-#define MAX_VALUE_LENGTH       256
-#define MAX_ACTION_LENGTH      64
-
-#define MAX_URL_LENGTH         256
-#define MAX_CONTENT_LENGTH     1024
-
-#define ETCD_JSON_NODE                 "node"
-#define ETCD_JSON_PREVNODE             "prevNode"
-#define ETCD_JSON_NODES                        "nodes"
-#define ETCD_JSON_ACTION               "action"
-#define ETCD_JSON_KEY                  "key"
-#define ETCD_JSON_VALUE                        "value"
-#define ETCD_JSON_MODIFIEDINDEX        "modifiedIndex"
-#define ETCD_ERROR_INDICATION   "errorCode"
-#define ETCD_INDEX                             "index"
-
-celix_status_t etcd_init(char* server, int port);
-bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
-bool etcd_getNodes(char* directory, char** nodeNames, int* size);
-bool etcd_set(char* key, char* value, int ttl, bool prevExist);
-bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value, char* rkey, int *modifiedIndex, int *error);
-
-#endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c 
b/remote_services/discovery_etcd/private/src/etcd.c
deleted file mode 100644
index 1b74f66..0000000
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0 
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.c
- *
- *  \date       26 Jul 2014
- *  \author     <a href="mailto:d...@celix.apache.org";>Apache Celix Project 
Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-
-#include <stdio.h>
-#include <stdbool.h>
-#include <string.h>
-
-#include <curl/curl.h>
-#include <jansson.h>
-
-#include "etcd.h"
-
-#define DEFAULT_CURL_TIMEOUT          10
-#define DEFAULT_CURL_CONECTTIMEOUT    10
-
-typedef enum {
-       GET, PUT, DELETE
-} request_t;
-
-static char* etcd_server = NULL;
-static int etcd_port = 0;
-
-struct MemoryStruct {
-       char *memory;
-       size_t size;
-};
-
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, 
void *userp) {
-       size_t realsize = size * nmemb;
-       struct MemoryStruct *mem = (struct MemoryStruct *) userp;
-
-       mem->memory = realloc(mem->memory, mem->size + realsize + 1);
-       if (mem->memory == NULL) {
-               /* out of memory! */
-               printf("not enough memory (realloc returned NULL)\n");
-               return 0;
-       }
-
-       memcpy(&(mem->memory[mem->size]), contents, realsize);
-       mem->size += realsize;
-       mem->memory[mem->size] = 0;
-
-       return realsize;
-}
-
-static int performRequest(char* url, request_t request, void* callback, void* 
reqData, void* repData) {
-       CURL *curl = NULL;
-       CURLcode res = 0;
-
-       curl = curl_easy_init();
-       curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
-       curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 
DEFAULT_CURL_CONECTTIMEOUT);
-       curl_easy_setopt(curl, CURLOPT_URL, url);
-       curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-       curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
-       curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
-       curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
-
-       if (request == PUT) {
-               curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
-               curl_easy_setopt(curl, CURLOPT_POST, 1L);
-//             curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-type: 
application/json");
-               curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
-       } else if (request == DELETE) {
-               curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
-       } else if (request == GET) {
-               curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
-       }
-
-       res = curl_easy_perform(curl);
-       curl_easy_cleanup(curl);
-
-       return res;
-}
-
-// open
-celix_status_t etcd_init(char* server, int port) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       etcd_server = server;
-       etcd_port = port;
-
-       return status;
-}
-
-// get
-bool etcd_get(char* key, char* value, char* action, int* modifiedIndex) {
-       json_t* js_root = NULL;
-       json_t* js_node = NULL;
-       json_t* js_value = NULL;
-       json_t* js_modifiedIndex = NULL;
-       json_error_t error;
-       int res;
-       struct MemoryStruct reply;
-
-       reply.memory = malloc(1); /* will be grown as needed by the realloc 
above */
-       reply.size = 0; /* no data at this point */
-
-       bool retVal = false;
-       char url[MAX_URL_LENGTH];
-       snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s";, etcd_server, 
etcd_port, key);
-
-       res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) 
&reply);
-
-       if (res == CURLE_OK) {
-               js_root = json_loads(reply.memory, 0, &error);
-
-               if (js_root != NULL) {
-                       js_node = json_object_get(js_root, ETCD_JSON_NODE);
-               }
-               if (js_node != NULL) {
-                       js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-                       js_modifiedIndex = json_object_get(js_node, 
ETCD_JSON_MODIFIEDINDEX);
-
-                       if (js_modifiedIndex != NULL) {
-                               *modifiedIndex = 
json_integer_value(js_modifiedIndex);
-                       }
-
-                       if (js_value != NULL) {
-                               snprintf(value, MAX_VALUE_LENGTH, "%s", 
json_string_value(js_value));
-                               retVal = true;
-                       }
-               }
-               if (js_root != NULL) {
-                       json_decref(js_root);
-               }
-       }
-
-       if (reply.memory) {
-               free(reply.memory);
-       }
-
-
-       return retVal;
-}
-
-// getNodes
-bool etcd_getNodes(char* directory, char** nodeNames, int* size) {
-       json_t* js_root = NULL;
-       json_t* js_node = NULL;
-       json_t* js_nodes = NULL;
-       json_error_t error;
-       int res;
-       struct MemoryStruct reply;
-
-       reply.memory = malloc(1); /* will be grown as needed by the realloc 
above */
-       reply.size = 0; /* no data at this point */
-
-       bool retVal = false;
-       char url[MAX_URL_LENGTH];
-       snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s";, etcd_server, 
etcd_port, directory);
-
-       res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) 
&reply);
-
-       if (res == CURLE_OK) {
-               js_root = json_loads(reply.memory, 0, &error);
-
-               if (js_root != NULL) {
-                       js_node = json_object_get(js_root, ETCD_JSON_NODE);
-               }
-               if (js_node != NULL) {
-                       js_nodes = json_object_get(js_node, ETCD_JSON_NODES);
-               }
-
-               if (js_nodes != NULL && json_is_array(js_nodes)) {
-                       int i = 0;
-                       retVal = true;
-
-                       for (i = 0; i < json_array_size(js_nodes) && i < 
MAX_NODES; i++) {
-                               json_t* js_node = json_array_get(js_nodes, i);
-
-                               if (!json_is_object(js_node)) {
-                                       retVal = false;
-                               } else {
-                                       json_t* js_key = 
json_object_get(js_node, ETCD_JSON_KEY);
-                                       snprintf(nodeNames[i], MAX_KEY_LENGTH, 
"%s", json_string_value(js_key));
-                               }
-                       }
-                       *size = i;
-               }
-               if (js_root != NULL) {
-                       json_decref(js_root);
-               }
-       }
-
-       if (reply.memory) {
-               free(reply.memory);
-       }
-
-       return retVal;
-}
-
-
-
-bool etcd_set(char* key, char* value, int ttl, bool prevExist) {
-       json_error_t error;
-       json_t* js_root = NULL;
-       json_t* js_node = NULL;
-       json_t* js_value = NULL;
-       bool retVal = false;
-       char url[MAX_URL_LENGTH];
-       char request[MAX_CONTENT_LENGTH];
-       char* cur = request;
-       int res;
-       struct MemoryStruct reply;
-
-       reply.memory = malloc(1); /* will be grown as needed by the realloc 
above */
-       reply.size = 0; /* no data at this point */
-
-       snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s";, etcd_server, 
etcd_port, key);
-       cur += snprintf(cur, MAX_CONTENT_LENGTH, "value=%s", value);
-
-       if (ttl > 0)
-           cur += snprintf(cur, MAX_CONTENT_LENGTH, ";ttl=%d", ttl);
-
-       if (prevExist)
-           cur += snprintf(cur, MAX_CONTENT_LENGTH, ";prevExist=true");
-
-       res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) 
&reply);
-
-       if (res == CURLE_OK) {
-               js_root = json_loads(reply.memory, 0, &error);
-
-               if (js_root != NULL) {
-                       js_node = json_object_get(js_root, ETCD_JSON_NODE);
-               }
-               if (js_node != NULL) {
-                       js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-               }
-               if (js_value != NULL && json_is_string(js_value)) {
-                       retVal = (strcmp(json_string_value(js_value), value) == 
0);
-               }
-               if (js_root != NULL) {
-                       json_decref(js_root);
-               }
-       }
-
-       if (reply.memory) {
-               free(reply.memory);
-       }
-
-       return retVal;
-}
-
-
-
-//delete
-bool etcd_del(char* key) {
-       json_error_t error;
-       json_t* js_root = NULL;
-       json_t* js_node = NULL;
-       bool retVal = false;
-       char url[MAX_URL_LENGTH];
-       char request[MAX_CONTENT_LENGTH];
-       int res;
-       struct MemoryStruct reply;
-
-       reply.memory = malloc(1); /* will be grown as needed by the realloc 
above */
-       reply.size = 0; /* no data at this point */
-
-       snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s";, etcd_server, 
etcd_port, key);
-       res = performRequest(url, DELETE, WriteMemoryCallback, request, (void*) 
&reply);
-
-       if (res == CURLE_OK) {
-               js_root = json_loads(reply.memory, 0, &error);
-
-               if (js_root != NULL) {
-                       js_node = json_object_get(js_root, ETCD_JSON_NODE);
-               }
-
-               retVal = (js_node != NULL);
-
-               if (js_root != NULL) {
-                       json_decref(js_root);
-               }
-       }
-
-       if (reply.memory) {
-               free(reply.memory);
-       }
-
-
-       return retVal;
-}
-
-///watch
-
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* 
value, char* rkey, int* modifiedIndex, int* errorCode) {
-    json_error_t error;
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    json_t* js_prevNode = NULL;
-    json_t* js_action = NULL;
-    json_t* js_value = NULL;
-    json_t* js_rkey = NULL;
-    json_t* js_prevValue = NULL;
-    json_t* js_modIndex = NULL;
-    json_t* js_error = NULL;   // used to indicate valid json response with 
ETCD error indication
-    bool retVal = false;
-    *errorCode = 0;
-    char url[MAX_URL_LENGTH];
-    int res;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above 
*/
-    reply.size = 0; /* no data at this point */
-
-    if (index != 0)
-        snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d";, etcd_server, 
etcd_port, key, index);
-    else
-        snprintf(url, MAX_URL_LENGTH, 
"http://%s:%d/v2/keys/%s?wait=true&recursive=true";, etcd_server, etcd_port, 
key);
-
-    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
-    if (res == CURLE_OK) {
-
-        js_root = json_loads(reply.memory, 0, &error);
-
-        if (js_root != NULL) {
-            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
-            js_error = json_object_get(js_root, ETCD_ERROR_INDICATION);
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if (js_node != NULL) {
-            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
-            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-            strncpy(prevValue, json_string_value(js_prevValue), 
MAX_VALUE_LENGTH);
-        }
-        if ((js_value != NULL) && (json_is_string(js_value))) {
-            strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
-        }
-        if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
-            *modifiedIndex = json_integer_value(js_modIndex);
-        } else {
-            *modifiedIndex = index;
-        }
-
-        if ((js_rkey != NULL) && (js_action != NULL) && 
(json_is_string(js_rkey)) && (json_is_string(js_action))) {
-            strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
-            strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-
-            retVal = true;
-        }
-        if ((js_error != NULL) && (json_is_integer(js_error))) {
-               *errorCode = json_integer_value(js_error);
-               js_modIndex = json_object_get(js_root, ETCD_INDEX);
-               if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
-                   *modifiedIndex = json_integer_value(js_modIndex);
-               }
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c 
b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index 7e1ce33..a09002a 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -26,6 +26,7 @@
 
 #include <stdbool.h>
 #include <stdlib.h>
+#include <unistd.h>
 #include <string.h>
 
 #include "log_helper.h"
@@ -35,6 +36,7 @@
 #include "discovery.h"
 #include "discovery_impl.h"
 
+#include <curl/curl.h>
 #include "etcd.h"
 #include "etcd_watcher.h"
 
@@ -51,18 +53,23 @@ struct etcd_watcher {
        volatile bool running;
 };
 
-#define CFG_ETCD_ROOT_PATH             "DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH  "discovery"
 
-#define CFG_ETCD_SERVER_IP             "DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+#define MAX_ROOTNODE_LENGTH                    128
+#define MAX_LOCALNODE_LENGTH           4096
+#define MAX_VALUE_LENGTH                       256
 
-#define CFG_ETCD_SERVER_PORT   "DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
+#define CFG_ETCD_ROOT_PATH                     "DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH          "discovery"
+
+#define CFG_ETCD_SERVER_IP                     "DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP         "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT           "DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT       2379
 
 // be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
+#define CFG_ETCD_TTL                                   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL                       30
 
 
 // note that the rootNode shouldn't have a leading slash
@@ -71,36 +78,41 @@ static celix_status_t 
etcdWatcher_getRootPath(bundle_context_pt context, char* r
        const char* rootPath = NULL;
 
        if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-               strcpy(rootNode, DEFAULT_ETCD_ROOTPATH);
+               strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
        }
        else {
-               strcpy(rootNode, rootPath);
+               strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
        }
 
        return status;
 }
 
-
 static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, 
char* localNodePath) {
        celix_status_t status = CELIX_SUCCESS;
        char rootPath[MAX_ROOTNODE_LENGTH];
     const char* uuid = NULL;
 
-    if ((etcdWatcher_getRootPath(context, &rootPath[0]) != CELIX_SUCCESS)) {
+    if ((etcdWatcher_getRootPath(context, rootPath) != CELIX_SUCCESS)) {
                status = CELIX_ILLEGAL_STATE;
     }
        else if (((bundleContext_getProperty(context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid)) != CELIX_SUCCESS) || (!uuid)) {
                status = CELIX_ILLEGAL_STATE;
        }
-       else if (rootPath[strlen(&rootPath[0]) - 1] == '/') {
-       snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], 
uuid);
+       else if (rootPath[strlen(rootPath) - 1] == '/') {
+       snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", rootPath, uuid);
     }
     else {
-       snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], 
uuid);
+       snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", rootPath, uuid);
     }
 
     return status;
 }
+
+static void add_node(const char *key, const char *value, void* arg) {
+       discovery_pt discovery = (discovery_pt) arg;
+       endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, (char 
*) value);
+}
+
 /*
  * retrieves all already existing discovery endpoints
  * from etcd and adds them to the poller.
@@ -108,44 +120,18 @@ static celix_status_t 
etcdWatcher_getLocalNodePath(bundle_context_pt context, ch
  * returns the modifiedIndex of the last modified
  * discovery endpoint (see etcd documentation).
  */
-static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt 
discovery, int* highestModified) {
+static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt 
discovery, long long* highestModified) {
        celix_status_t status = CELIX_SUCCESS;
-       char** nodeArr = calloc(MAX_NODES, sizeof(*nodeArr));
-       char rootPath[MAX_ROOTNODE_LENGTH];
-       int i, size;
-
-       *highestModified = -1;
 
-       for (i = 0; i < MAX_NODES; i++) {
-               nodeArr[i] = calloc(MAX_KEY_LENGTH, sizeof(*nodeArr[i]));
-       }
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       status = etcdWatcher_getRootPath(discovery->context, rootPath);
 
-       // we need to go though all nodes and get the highest modifiedIndex
-       if (((status = etcdWatcher_getRootPath(discovery->context, 
&rootPath[0])) == CELIX_SUCCESS) &&
-                (etcd_getNodes(rootPath, nodeArr, &size) == true)) {
-               for (i = 0; i < size; i++) {
-                       char* key = nodeArr[i];
-                       char value[MAX_VALUE_LENGTH];
-                       char action[MAX_VALUE_LENGTH];
-                       int modIndex;
-
-                       if (etcd_get(key, &value[0], &action[0], &modIndex) == 
true) {
-                               // TODO: check that this is not equals to the 
local endpoint
-                               
endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, &value[0]);
-
-                               if (modIndex > *highestModified) {
-                                       *highestModified = modIndex;
-                               }
-                       }
+       if (status == CELIX_SUCCESS) {
+               if(etcd_get_directory(rootPath, add_node, discovery, 
highestModified)) {
+                           status = CELIX_ILLEGAL_ARGUMENT;
                }
        }
 
-       for (i = 0; i < MAX_NODES; i++) {
-               free(nodeArr[i]);
-       }
-
-       free(nodeArr);
-
        return status;
 }
 
@@ -154,8 +140,7 @@ static celix_status_t 
etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
 {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;
     char localNodePath[MAX_LOCALNODE_LENGTH];
-    char value[MAX_VALUE_LENGTH];
-    char action[MAX_VALUE_LENGTH];
+    char *value;
        char url[MAX_VALUE_LENGTH];
     int modIndex;
     char* endpoints = NULL;
@@ -166,30 +151,30 @@ static celix_status_t 
etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
        endpoint_discovery_server_pt server = watcher->discovery->server;
 
     // register own framework
-    if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != 
CELIX_SUCCESS) {
+    if ((status = etcdWatcher_getLocalNodePath(context, localNodePath)) != 
CELIX_SUCCESS) {
         return status;
     }
 
-       if (endpointDiscoveryServer_getUrl(server, &url[0]) != CELIX_SUCCESS) {
+       if (endpointDiscoveryServer_getUrl(server, url) != CELIX_SUCCESS) {
                snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s";, 
DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
        }
 
-       endpoints = &url[0];
+       endpoints = url;
 
     if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) != 
CELIX_SUCCESS) || !ttlStr) {
         ttl = DEFAULT_ETCD_TTL;
     }
     else
     {
-        char* endptr = (char*)ttlStr;
+        char* endptr = (char *) ttlStr;
         errno = 0;
-        ttl =  strtol(ttlStr, &endptr, 10);
+        ttl = strtol(ttlStr, &endptr, 10);
         if (*endptr || errno != 0) {
             ttl = DEFAULT_ETCD_TTL;
         }
     }
 
-       if (etcd_get(localNodePath, &value[0], &action[0], &modIndex) != true) {
+       if (etcd_get(localNodePath, &value, &modIndex) != true) {
                etcd_set(localNodePath, endpoints, ttl, false);
        }
        else if (etcd_set(localNodePath, endpoints, ttl, true) == false)  {
@@ -199,6 +184,8 @@ static celix_status_t 
etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
         status = CELIX_SUCCESS;
     }
 
+       FREE_MEM(value);
+
     return status;
 }
 
@@ -262,52 +249,47 @@ static celix_status_t 
etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key
 static void* etcdWatcher_run(void* data) {
        etcd_watcher_pt watcher = (etcd_watcher_pt) data;
        time_t timeBeforeWatch = time(NULL);
-       static char rootPath[MAX_ROOTNODE_LENGTH];
-       int highestModified = 0;
-       int errorCode=0;
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       long long highestModified = 0;
 
        bundle_context_pt context = watcher->discovery->context;
 
        etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, 
&highestModified);
-       etcdWatcher_getRootPath(context, &rootPath[0]);
+       etcdWatcher_getRootPath(context, rootPath);
 
        while (watcher->running) {
 
-        char rkey[MAX_KEY_LENGTH];
-               char value[MAX_VALUE_LENGTH];
-               char preValue[MAX_VALUE_LENGTH];
-               char action[MAX_ACTION_LENGTH];
-        int modIndex=0;
-
-        if (etcd_watch(rootPath, highestModified + 1, &action[0], 
&preValue[0], &value[0], &rkey[0], &modIndex, &errorCode) == true) {
-               if (strcmp(action, "set") == 0) {
-                       etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-               } else if (strcmp(action, "delete") == 0) {
-                       etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-               } else if (strcmp(action, "expire") == 0) {
-                       etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-               } else if (strcmp(action, "update") == 0) {
-                       etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-               } else {
-                       logHelper_log(*watcher->loghelper, 
OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
-               }
+               char *rkey = NULL;
+               char *value = NULL;
+               char *preValue = NULL;
+               char *action = NULL;
+               long long modIndex;
+
+        if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, 
&value, &rkey, &modIndex) == 0 && action != NULL) {
+                       if (strcmp(action, "set") == 0) {
+                               etcdWatcher_addEntry(watcher, rkey, value);
+                       } else if (strcmp(action, "delete") == 0) {
+                               etcdWatcher_removeEntry(watcher, rkey, value);
+                       } else if (strcmp(action, "expire") == 0) {
+                               etcdWatcher_removeEntry(watcher, rkey, value);
+                       } else if (strcmp(action, "update") == 0) {
+                               etcdWatcher_addEntry(watcher, rkey, value);
+                       } else {
+                               logHelper_log(*watcher->loghelper, 
OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
+                       }
 
-               highestModified = modIndex;
-        }
-        /* prevent busy waiting, in case etcd_watch returns false */
-        else {
-               switch (errorCode) {
-               case 401:
-                       // Etcd can store at most 1000 events
                        highestModified = modIndex;
-                       break;
-               default:
-                       break;
-               }
+        } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+                       sleep(DEFAULT_ETCD_TTL / 4);
         }
 
+        FREE_MEM(action);
+        FREE_MEM(value);
+        FREE_MEM(preValue);
+        FREE_MEM(rkey);
+
                // update own framework uuid
-               if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) {
+               if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
                        etcdWatcher_addOwnFramework(watcher);
                        timeBeforeWatch = time(NULL);
                }
@@ -361,7 +343,11 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, 
bundle_context_pt cont
                }
        }
 
-    status = etcd_init((char*)etcd_server, etcd_port);
+       if (etcd_init((char*) etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 
0) {
+               status = CELIX_BUNDLE_EXCEPTION;
+       } else {
+               status = CELIX_SUCCESS;
+       }
 
     if (status == CELIX_SUCCESS) {
         etcdWatcher_addOwnFramework(*watcher);
@@ -391,7 +377,7 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) 
{
        celixThread_join(watcher->watcherThread, NULL);
 
        // register own framework
-       status = etcdWatcher_getLocalNodePath(watcher->discovery->context, 
&localNodePath[0]);
+       status = etcdWatcher_getLocalNodePath(watcher->discovery->context, 
localNodePath);
 
        if (status != CELIX_SUCCESS || etcd_del(localNodePath) == false)
        {

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/examples/CMakeLists.txt 
b/remote_services/examples/CMakeLists.txt
index f24fa6f..44b7733 100644
--- a/remote_services/examples/CMakeLists.txt
+++ b/remote_services/examples/CMakeLists.txt
@@ -83,7 +83,9 @@ if (RSA_EXAMPLES)
             BUNDLES discovery_etcd topology_manager remote_service_admin_http 
calculator shell shell_tui log_service log_writer
         )
         deploy_bundles_dir(remote-services-etcd DIR_NAME "endpoints"
-            BUNDLES org.apache.celix.calc.api.Calculator_endpoint
+            BUNDLES
+               org.apache.celix.calc.api.Calculator_endpoint
+               org.apache.celix.calc.api.Calculator2_endpoint
         )
 
         add_deploy("remote-services-etcd-client" 

Reply via email to