Repository: celix Updated Branches: refs/heads/develop 4e665476c -> d62731a3c
CELIX-395: discovery_etcd using etcdlib now Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/d62731a3 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/d62731a3 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/d62731a3 Branch: refs/heads/develop Commit: d62731a3cfe482d6796fea3fbd386091756f3b2a Parents: 4e66547 Author: Roy Lenferink <lenferink...@gmail.com> Authored: Fri Jan 27 10:27:48 2017 +0100 Committer: Roy Lenferink <lenferink...@gmail.com> Committed: Fri Jan 27 11:35:28 2017 +0100 ---------------------------------------------------------------------- remote_services/discovery_etcd/CMakeLists.txt | 4 +- .../private/include/discovery_impl.h | 4 +- .../discovery_etcd/private/include/etcd.h | 61 --- .../discovery_etcd/private/src/etcd.c | 397 ------------------- .../discovery_etcd/private/src/etcd_watcher.c | 168 ++++---- remote_services/examples/CMakeLists.txt | 4 +- 6 files changed, 83 insertions(+), 555 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/CMakeLists.txt b/remote_services/discovery_etcd/CMakeLists.txt index 31ba269..442d486 100644 --- a/remote_services/discovery_etcd/CMakeLists.txt +++ b/remote_services/discovery_etcd/CMakeLists.txt @@ -24,6 +24,7 @@ if (RSA_DISCOVERY_ETCD) include_directories("${CURL_INCLUDE_DIR}") include_directories("${JANSSON_INCLUDE_DIR}") include_directories("${LIBXML2_INCLUDE_DIR}") + include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include") include_directories("${PROJECT_SOURCE_DIR}/utils/public/include") include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include") include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include") @@ -40,7 +41,6 @@ if (RSA_DISCOVERY_ETCD) NAME "Apache Celix RSA Discovery ETCD" SOURCES private/src/discovery_impl.c - private/src/etcd.c private/src/etcd_watcher.c ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c @@ -56,6 +56,6 @@ if (RSA_DISCOVERY_ETCD) install_bundle(discovery_etcd) - target_link_libraries(discovery_etcd celix_framework ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES}) + target_link_libraries(discovery_etcd celix_framework etcdlib ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES} ${JANSSON_LIBRARIES}) endif (RSA_DISCOVERY_ETCD) http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/discovery_impl.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/include/discovery_impl.h b/remote_services/discovery_etcd/private/include/discovery_impl.h index e7e1071..a19b145 100644 --- a/remote_services/discovery_etcd/private/include/discovery_impl.h +++ b/remote_services/discovery_etcd/private/include/discovery_impl.h @@ -45,9 +45,7 @@ #define DEFAULT_POLL_ENDPOINTS "" -#define MAX_ROOTNODE_LENGTH 64 -#define MAX_LOCALNODE_LENGTH 256 - +#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;} struct discovery { bundle_context_pt context; http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/etcd.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h deleted file mode 100644 index 2ba09de..0000000 --- a/remote_services/discovery_etcd/private/include/etcd.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * etcd.h - * - * \date 26 Jul 2014 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - - -#ifndef ETCD_H_ -#define ETCD_H_ - -#include <stdbool.h> -#include <celix_errno.h> - -#define MAX_NODES 256 - -#define MAX_KEY_LENGTH 256 -#define MAX_VALUE_LENGTH 256 -#define MAX_ACTION_LENGTH 64 - -#define MAX_URL_LENGTH 256 -#define MAX_CONTENT_LENGTH 1024 - -#define ETCD_JSON_NODE "node" -#define ETCD_JSON_PREVNODE "prevNode" -#define ETCD_JSON_NODES "nodes" -#define ETCD_JSON_ACTION "action" -#define ETCD_JSON_KEY "key" -#define ETCD_JSON_VALUE "value" -#define ETCD_JSON_MODIFIEDINDEX "modifiedIndex" -#define ETCD_ERROR_INDICATION "errorCode" -#define ETCD_INDEX "index" - -celix_status_t etcd_init(char* server, int port); -bool etcd_get(char* key, char* value, char*action, int* modifiedIndex); -bool etcd_getNodes(char* directory, char** nodeNames, int* size); -bool etcd_set(char* key, char* value, int ttl, bool prevExist); -bool etcd_del(char* key); -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex, int *error); - -#endif /* ETCD_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c deleted file mode 100644 index 1b74f66..0000000 --- a/remote_services/discovery_etcd/private/src/etcd.c +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * etcd.c - * - * \date 26 Jul 2014 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - - -#include <stdio.h> -#include <stdbool.h> -#include <string.h> - -#include <curl/curl.h> -#include <jansson.h> - -#include "etcd.h" - -#define DEFAULT_CURL_TIMEOUT 10 -#define DEFAULT_CURL_CONECTTIMEOUT 10 - -typedef enum { - GET, PUT, DELETE -} request_t; - -static char* etcd_server = NULL; -static int etcd_port = 0; - -struct MemoryStruct { - char *memory; - size_t size; -}; - -static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) { - size_t realsize = size * nmemb; - struct MemoryStruct *mem = (struct MemoryStruct *) userp; - - mem->memory = realloc(mem->memory, mem->size + realsize + 1); - if (mem->memory == NULL) { - /* out of memory! */ - printf("not enough memory (realloc returned NULL)\n"); - return 0; - } - - memcpy(&(mem->memory[mem->size]), contents, realsize); - mem->size += realsize; - mem->memory[mem->size] = 0; - - return realsize; -} - -static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) { - CURL *curl = NULL; - CURLcode res = 0; - - curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT); - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData); - curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); - - if (request == PUT) { - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); - curl_easy_setopt(curl, CURLOPT_POST, 1L); -// curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-type: application/json"); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData); - } else if (request == DELETE) { - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - } else if (request == GET) { - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET"); - } - - res = curl_easy_perform(curl); - curl_easy_cleanup(curl); - - return res; -} - -// open -celix_status_t etcd_init(char* server, int port) { - celix_status_t status = CELIX_SUCCESS; - - etcd_server = server; - etcd_port = port; - - return status; -} - -// get -bool etcd_get(char* key, char* value, char* action, int* modifiedIndex) { - json_t* js_root = NULL; - json_t* js_node = NULL; - json_t* js_value = NULL; - json_t* js_modifiedIndex = NULL; - json_error_t error; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - - bool retVal = false; - char url[MAX_URL_LENGTH]; - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); - - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); - - if (res == CURLE_OK) { - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_node = json_object_get(js_root, ETCD_JSON_NODE); - } - if (js_node != NULL) { - js_value = json_object_get(js_node, ETCD_JSON_VALUE); - js_modifiedIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX); - - if (js_modifiedIndex != NULL) { - *modifiedIndex = json_integer_value(js_modifiedIndex); - } - - if (js_value != NULL) { - snprintf(value, MAX_VALUE_LENGTH, "%s", json_string_value(js_value)); - retVal = true; - } - } - if (js_root != NULL) { - json_decref(js_root); - } - } - - if (reply.memory) { - free(reply.memory); - } - - - return retVal; -} - -// getNodes -bool etcd_getNodes(char* directory, char** nodeNames, int* size) { - json_t* js_root = NULL; - json_t* js_node = NULL; - json_t* js_nodes = NULL; - json_error_t error; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - - bool retVal = false; - char url[MAX_URL_LENGTH]; - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, directory); - - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); - - if (res == CURLE_OK) { - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_node = json_object_get(js_root, ETCD_JSON_NODE); - } - if (js_node != NULL) { - js_nodes = json_object_get(js_node, ETCD_JSON_NODES); - } - - if (js_nodes != NULL && json_is_array(js_nodes)) { - int i = 0; - retVal = true; - - for (i = 0; i < json_array_size(js_nodes) && i < MAX_NODES; i++) { - json_t* js_node = json_array_get(js_nodes, i); - - if (!json_is_object(js_node)) { - retVal = false; - } else { - json_t* js_key = json_object_get(js_node, ETCD_JSON_KEY); - snprintf(nodeNames[i], MAX_KEY_LENGTH, "%s", json_string_value(js_key)); - } - } - *size = i; - } - if (js_root != NULL) { - json_decref(js_root); - } - } - - if (reply.memory) { - free(reply.memory); - } - - return retVal; -} - - - -bool etcd_set(char* key, char* value, int ttl, bool prevExist) { - json_error_t error; - json_t* js_root = NULL; - json_t* js_node = NULL; - json_t* js_value = NULL; - bool retVal = false; - char url[MAX_URL_LENGTH]; - char request[MAX_CONTENT_LENGTH]; - char* cur = request; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); - cur += snprintf(cur, MAX_CONTENT_LENGTH, "value=%s", value); - - if (ttl > 0) - cur += snprintf(cur, MAX_CONTENT_LENGTH, ";ttl=%d", ttl); - - if (prevExist) - cur += snprintf(cur, MAX_CONTENT_LENGTH, ";prevExist=true"); - - res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply); - - if (res == CURLE_OK) { - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_node = json_object_get(js_root, ETCD_JSON_NODE); - } - if (js_node != NULL) { - js_value = json_object_get(js_node, ETCD_JSON_VALUE); - } - if (js_value != NULL && json_is_string(js_value)) { - retVal = (strcmp(json_string_value(js_value), value) == 0); - } - if (js_root != NULL) { - json_decref(js_root); - } - } - - if (reply.memory) { - free(reply.memory); - } - - return retVal; -} - - - -//delete -bool etcd_del(char* key) { - json_error_t error; - json_t* js_root = NULL; - json_t* js_node = NULL; - bool retVal = false; - char url[MAX_URL_LENGTH]; - char request[MAX_CONTENT_LENGTH]; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); - res = performRequest(url, DELETE, WriteMemoryCallback, request, (void*) &reply); - - if (res == CURLE_OK) { - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_node = json_object_get(js_root, ETCD_JSON_NODE); - } - - retVal = (js_node != NULL); - - if (js_root != NULL) { - json_decref(js_root); - } - } - - if (reply.memory) { - free(reply.memory); - } - - - return retVal; -} - -///watch - -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex, int* errorCode) { - json_error_t error; - json_t* js_root = NULL; - json_t* js_node = NULL; - json_t* js_prevNode = NULL; - json_t* js_action = NULL; - json_t* js_value = NULL; - json_t* js_rkey = NULL; - json_t* js_prevValue = NULL; - json_t* js_modIndex = NULL; - json_t* js_error = NULL; // used to indicate valid json response with ETCD error indication - bool retVal = false; - *errorCode = 0; - char url[MAX_URL_LENGTH]; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - - if (index != 0) - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, index); - else - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key); - - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); - - if (res == CURLE_OK) { - - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_action = json_object_get(js_root, ETCD_JSON_ACTION); - js_node = json_object_get(js_root, ETCD_JSON_NODE); - js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE); - js_error = json_object_get(js_root, ETCD_ERROR_INDICATION); - } - if (js_prevNode != NULL) { - js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); - } - if (js_node != NULL) { - js_rkey = json_object_get(js_node, ETCD_JSON_KEY); - js_value = json_object_get(js_node, ETCD_JSON_VALUE); - js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX); - } - if (js_prevNode != NULL) { - js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); - } - if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) { - strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH); - } - if ((js_value != NULL) && (json_is_string(js_value))) { - strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH); - } - if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) { - *modifiedIndex = json_integer_value(js_modIndex); - } else { - *modifiedIndex = index; - } - - if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey)) && (json_is_string(js_action))) { - strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH); - strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH); - - retVal = true; - } - if ((js_error != NULL) && (json_is_integer(js_error))) { - *errorCode = json_integer_value(js_error); - js_modIndex = json_object_get(js_root, ETCD_INDEX); - if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) { - *modifiedIndex = json_integer_value(js_modIndex); - } - } - if (js_root != NULL) { - json_decref(js_root); - } - - } - - if (reply.memory) { - free(reply.memory); - } - - return retVal; -} http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c index 7e1ce33..a09002a 100644 --- a/remote_services/discovery_etcd/private/src/etcd_watcher.c +++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c @@ -26,6 +26,7 @@ #include <stdbool.h> #include <stdlib.h> +#include <unistd.h> #include <string.h> #include "log_helper.h" @@ -35,6 +36,7 @@ #include "discovery.h" #include "discovery_impl.h" +#include <curl/curl.h> #include "etcd.h" #include "etcd_watcher.h" @@ -51,18 +53,23 @@ struct etcd_watcher { volatile bool running; }; -#define CFG_ETCD_ROOT_PATH "DISCOVERY_ETCD_ROOT_PATH" -#define DEFAULT_ETCD_ROOTPATH "discovery" -#define CFG_ETCD_SERVER_IP "DISCOVERY_ETCD_SERVER_IP" -#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" +#define MAX_ROOTNODE_LENGTH 128 +#define MAX_LOCALNODE_LENGTH 4096 +#define MAX_VALUE_LENGTH 256 -#define CFG_ETCD_SERVER_PORT "DISCOVERY_ETCD_SERVER_PORT" -#define DEFAULT_ETCD_SERVER_PORT 2379 +#define CFG_ETCD_ROOT_PATH "DISCOVERY_ETCD_ROOT_PATH" +#define DEFAULT_ETCD_ROOTPATH "discovery" + +#define CFG_ETCD_SERVER_IP "DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 // be careful - this should be higher than the curl timeout -#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" -#define DEFAULT_ETCD_TTL 30 +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 // note that the rootNode shouldn't have a leading slash @@ -71,36 +78,41 @@ static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* r const char* rootPath = NULL; if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { - strcpy(rootNode, DEFAULT_ETCD_ROOTPATH); + strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); } else { - strcpy(rootNode, rootPath); + strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); } return status; } - static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath) { celix_status_t status = CELIX_SUCCESS; char rootPath[MAX_ROOTNODE_LENGTH]; const char* uuid = NULL; - if ((etcdWatcher_getRootPath(context, &rootPath[0]) != CELIX_SUCCESS)) { + if ((etcdWatcher_getRootPath(context, rootPath) != CELIX_SUCCESS)) { status = CELIX_ILLEGAL_STATE; } else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid)) != CELIX_SUCCESS) || (!uuid)) { status = CELIX_ILLEGAL_STATE; } - else if (rootPath[strlen(&rootPath[0]) - 1] == '/') { - snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid); + else if (rootPath[strlen(rootPath) - 1] == '/') { + snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", rootPath, uuid); } else { - snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid); + snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", rootPath, uuid); } return status; } + +static void add_node(const char *key, const char *value, void* arg) { + discovery_pt discovery = (discovery_pt) arg; + endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, (char *) value); +} + /* * retrieves all already existing discovery endpoints * from etcd and adds them to the poller. @@ -108,44 +120,18 @@ static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, ch * returns the modifiedIndex of the last modified * discovery endpoint (see etcd documentation). */ -static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, int* highestModified) { +static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, long long* highestModified) { celix_status_t status = CELIX_SUCCESS; - char** nodeArr = calloc(MAX_NODES, sizeof(*nodeArr)); - char rootPath[MAX_ROOTNODE_LENGTH]; - int i, size; - - *highestModified = -1; - for (i = 0; i < MAX_NODES; i++) { - nodeArr[i] = calloc(MAX_KEY_LENGTH, sizeof(*nodeArr[i])); - } + char rootPath[MAX_ROOTNODE_LENGTH]; + status = etcdWatcher_getRootPath(discovery->context, rootPath); - // we need to go though all nodes and get the highest modifiedIndex - if (((status = etcdWatcher_getRootPath(discovery->context, &rootPath[0])) == CELIX_SUCCESS) && - (etcd_getNodes(rootPath, nodeArr, &size) == true)) { - for (i = 0; i < size; i++) { - char* key = nodeArr[i]; - char value[MAX_VALUE_LENGTH]; - char action[MAX_VALUE_LENGTH]; - int modIndex; - - if (etcd_get(key, &value[0], &action[0], &modIndex) == true) { - // TODO: check that this is not equals to the local endpoint - endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, &value[0]); - - if (modIndex > *highestModified) { - *highestModified = modIndex; - } - } + if (status == CELIX_SUCCESS) { + if(etcd_get_directory(rootPath, add_node, discovery, highestModified)) { + status = CELIX_ILLEGAL_ARGUMENT; } } - for (i = 0; i < MAX_NODES; i++) { - free(nodeArr[i]); - } - - free(nodeArr); - return status; } @@ -154,8 +140,7 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher) { celix_status_t status = CELIX_BUNDLE_EXCEPTION; char localNodePath[MAX_LOCALNODE_LENGTH]; - char value[MAX_VALUE_LENGTH]; - char action[MAX_VALUE_LENGTH]; + char *value; char url[MAX_VALUE_LENGTH]; int modIndex; char* endpoints = NULL; @@ -166,30 +151,30 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher) endpoint_discovery_server_pt server = watcher->discovery->server; // register own framework - if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != CELIX_SUCCESS) { + if ((status = etcdWatcher_getLocalNodePath(context, localNodePath)) != CELIX_SUCCESS) { return status; } - if (endpointDiscoveryServer_getUrl(server, &url[0]) != CELIX_SUCCESS) { + if (endpointDiscoveryServer_getUrl(server, url) != CELIX_SUCCESS) { snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH); } - endpoints = &url[0]; + endpoints = url; if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) { ttl = DEFAULT_ETCD_TTL; } else { - char* endptr = (char*)ttlStr; + char* endptr = (char *) ttlStr; errno = 0; - ttl = strtol(ttlStr, &endptr, 10); + ttl = strtol(ttlStr, &endptr, 10); if (*endptr || errno != 0) { ttl = DEFAULT_ETCD_TTL; } } - if (etcd_get(localNodePath, &value[0], &action[0], &modIndex) != true) { + if (etcd_get(localNodePath, &value, &modIndex) != true) { etcd_set(localNodePath, endpoints, ttl, false); } else if (etcd_set(localNodePath, endpoints, ttl, true) == false) { @@ -199,6 +184,8 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher) status = CELIX_SUCCESS; } + FREE_MEM(value); + return status; } @@ -262,52 +249,47 @@ static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key static void* etcdWatcher_run(void* data) { etcd_watcher_pt watcher = (etcd_watcher_pt) data; time_t timeBeforeWatch = time(NULL); - static char rootPath[MAX_ROOTNODE_LENGTH]; - int highestModified = 0; - int errorCode=0; + char rootPath[MAX_ROOTNODE_LENGTH]; + long long highestModified = 0; bundle_context_pt context = watcher->discovery->context; etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified); - etcdWatcher_getRootPath(context, &rootPath[0]); + etcdWatcher_getRootPath(context, rootPath); while (watcher->running) { - char rkey[MAX_KEY_LENGTH]; - char value[MAX_VALUE_LENGTH]; - char preValue[MAX_VALUE_LENGTH]; - char action[MAX_ACTION_LENGTH]; - int modIndex=0; - - if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex, &errorCode) == true) { - if (strcmp(action, "set") == 0) { - etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "delete") == 0) { - etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "expire") == 0) { - etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "update") == 0) { - etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); - } else { - logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action); - } + char *rkey = NULL; + char *value = NULL; + char *preValue = NULL; + char *action = NULL; + long long modIndex; + + if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { + if (strcmp(action, "set") == 0) { + etcdWatcher_addEntry(watcher, rkey, value); + } else if (strcmp(action, "delete") == 0) { + etcdWatcher_removeEntry(watcher, rkey, value); + } else if (strcmp(action, "expire") == 0) { + etcdWatcher_removeEntry(watcher, rkey, value); + } else if (strcmp(action, "update") == 0) { + etcdWatcher_addEntry(watcher, rkey, value); + } else { + logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action); + } - highestModified = modIndex; - } - /* prevent busy waiting, in case etcd_watch returns false */ - else { - switch (errorCode) { - case 401: - // Etcd can store at most 1000 events highestModified = modIndex; - break; - default: - break; - } + } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { + sleep(DEFAULT_ETCD_TTL / 4); } + FREE_MEM(action); + FREE_MEM(value); + FREE_MEM(preValue); + FREE_MEM(rkey); + // update own framework uuid - if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) { + if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { etcdWatcher_addOwnFramework(watcher); timeBeforeWatch = time(NULL); } @@ -361,7 +343,11 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont } } - status = etcd_init((char*)etcd_server, etcd_port); + if (etcd_init((char*) etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) { + status = CELIX_BUNDLE_EXCEPTION; + } else { + status = CELIX_SUCCESS; + } if (status == CELIX_SUCCESS) { etcdWatcher_addOwnFramework(*watcher); @@ -391,7 +377,7 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) { celixThread_join(watcher->watcherThread, NULL); // register own framework - status = etcdWatcher_getLocalNodePath(watcher->discovery->context, &localNodePath[0]); + status = etcdWatcher_getLocalNodePath(watcher->discovery->context, localNodePath); if (status != CELIX_SUCCESS || etcd_del(localNodePath) == false) { http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/examples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/remote_services/examples/CMakeLists.txt b/remote_services/examples/CMakeLists.txt index f24fa6f..44b7733 100644 --- a/remote_services/examples/CMakeLists.txt +++ b/remote_services/examples/CMakeLists.txt @@ -83,7 +83,9 @@ if (RSA_EXAMPLES) BUNDLES discovery_etcd topology_manager remote_service_admin_http calculator shell shell_tui log_service log_writer ) deploy_bundles_dir(remote-services-etcd DIR_NAME "endpoints" - BUNDLES org.apache.celix.calc.api.Calculator_endpoint + BUNDLES + org.apache.celix.calc.api.Calculator_endpoint + org.apache.celix.calc.api.Calculator2_endpoint ) add_deploy("remote-services-etcd-client"