Repository: celix Updated Branches: refs/heads/develop 490811d2c -> 7b3d1a8e5
etcd_get_directory retrieves the index from the HTTP header instead of the last modified index in the content Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7b3d1a8e Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7b3d1a8e Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7b3d1a8e Branch: refs/heads/develop Commit: 7b3d1a8e5ddca7b4f72897c2be8ff26e55e50a5d Parents: 490811d Author: Erjan Altena <erjanalt...@gmail.com> Authored: Thu Feb 8 14:20:54 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Thu Feb 8 14:20:54 2018 +0100 ---------------------------------------------------------------------- etcdlib/src/etcd.c | 102 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 78 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/7b3d1a8e/etcdlib/src/etcd.c ---------------------------------------------------------------------- diff --git a/etcdlib/src/etcd.c b/etcdlib/src/etcd.c index cb5c0ff..861b07a 100644 --- a/etcdlib/src/etcd.c +++ b/etcdlib/src/etcd.c @@ -35,6 +35,8 @@ #define ETCD_JSON_MODIFIEDINDEX "modifiedIndex" #define ETCD_JSON_INDEX "index" +#define ETCD_HEADER_INDEX "X-Etcd-Index: " + #define MAX_OVERHEAD_LENGTH 64 #define DEFAULT_CURL_TIMEOUT 10 #define DEFAULT_CURL_CONECTTIMEOUT 10 @@ -48,14 +50,16 @@ static int etcd_port = 0; struct MemoryStruct { char *memory; - size_t size; + char *header; + size_t memorySize; + size_t headerSize; }; /** * Static function declarations */ -static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData); +static int performRequest(char* url, request_t request, void* reqData, void* repData); static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp); /** * External function definition @@ -92,12 +96,14 @@ int etcd_get(const char* key, char** value, int* modifiedIndex) { struct MemoryStruct reply; reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ + reply.memorySize = 0; /* no data at this point */ + reply.header = NULL; /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ int retVal = -1; char *url; asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + res = performRequest(url, GET, NULL, (void*) &reply); free(url); if (res == CURLE_OK) { @@ -178,6 +184,17 @@ static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback ca return (*mod_index > 0 ? 0 : 1); } +static long long etcd_get_current_index(const char* headerData) { + long long index = -1; + char * indexStr = strstr(headerData, ETCD_HEADER_INDEX); + indexStr += strlen(ETCD_HEADER_INDEX); + + if (sscanf(indexStr, "%lld\n",&index) == 1) { + } else { + index = -1; + } + return index; +} /** * etcd_get_directory */ @@ -190,14 +207,16 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback, struct MemoryStruct reply; reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ + reply.memorySize = 0; /* no data at this point */ + reply.header = malloc(1); /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ int retVal = 0; char *url; asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory); - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + res = performRequest(url, GET, NULL, (void*) &reply); free(url); if (res == CURLE_OK) { js_root = json_loads(reply.memory, 0, &error); @@ -210,6 +229,10 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback, if (js_rootnode != NULL) { *modifiedIndex = 0; retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex); + long long indexFromHeader = etcd_get_current_index(reply.header); + if (indexFromHeader > *modifiedIndex) { + *modifiedIndex = indexFromHeader; + } } else { // Error occured, retrieve the index of ETCD from the error code js_rootnode = json_object_get(js_root, ETCD_JSON_INDEX); @@ -228,9 +251,8 @@ int etcd_get_directory(const char* directory, etcd_key_value_callback callback, } } - if (reply.memory) { - free(reply.memory); - } + free(reply.memory); + free(reply.header); return retVal; } @@ -257,7 +279,9 @@ int etcd_set(const char* key, const char* value, int ttl, bool prevExist) { } reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ + reply.memorySize = 0; /* no data at this point */ + reply.header = NULL; /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); @@ -270,7 +294,7 @@ int etcd_set(const char* key, const char* value, int ttl, bool prevExist) { requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true"); } - res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply); + res = performRequest(url, PUT, request, (void*) &reply); if(url) { free(url); } @@ -346,13 +370,18 @@ int etcd_watch(const char* key, long long index, char** action, char** prevValue struct MemoryStruct reply; reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ + reply.memorySize = 0; /* no data at this point */ + reply.header = NULL; /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ + + reply.header = malloc(1); /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ if (index != 0) asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index); else asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key); - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + res = performRequest(url, GET, NULL, (void*) &reply); if(url) free(url); if (res == CURLE_OK) { @@ -422,10 +451,12 @@ int etcd_del(const char* key) { struct MemoryStruct reply; reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ + reply.memorySize = 0; /* no data at this point */ + reply.header = NULL; /* will be grown as needed by the realloc above */ + reply.headerSize = 0; /* no data at this point */ asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key); - res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply); + res = performRequest(url, DELETE, NULL, (void*) &reply); free(url); if (res == CURLE_OK) { @@ -443,9 +474,8 @@ int etcd_del(const char* key) { } } - if (reply.memory) { - free(reply.memory); - } + free(reply.memory); + free(reply.header); return retVal; } @@ -455,21 +485,41 @@ static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, voi size_t realsize = size * nmemb; struct MemoryStruct *mem = (struct MemoryStruct *) userp; - mem->memory = realloc(mem->memory, mem->size + realsize + 1); + mem->memory = realloc(mem->memory, mem->memorySize + realsize + 1); if (mem->memory == NULL) { /* out of memory! */ fprintf(stderr, "[ETCDLIB] Error: not enough memory (realloc returned NULL)\n"); return 0; } - memcpy(&(mem->memory[mem->size]), contents, realsize); - mem->size += realsize; - mem->memory[mem->size] = 0; + memcpy(&(mem->memory[mem->memorySize]), contents, realsize); + mem->memorySize += realsize; + mem->memory[mem->memorySize] = 0; return realsize; } -static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) { +static size_t WriteHeaderCallback(void *contents, size_t size, size_t nmemb, void *userp) { + size_t realsize = size * nmemb; + struct MemoryStruct *mem = (struct MemoryStruct *) userp; + + mem->header = realloc(mem->header, mem->headerSize + realsize + 1); + if (mem->header == NULL) { + /* out of memory! */ + fprintf(stderr, "[ETCDLIB] Error: not enough header-memory (realloc returned NULL)\n"); + return 0; + } + + memcpy(&(mem->header[mem->headerSize]), contents, realsize); + mem->headerSize += realsize; + mem->header[mem->headerSize] = 0; + + return realsize; +} + + + +static int performRequest(char* url, request_t request, void* reqData, void* repData) { CURL *curl = NULL; CURLcode res = 0; curl = curl_easy_init(); @@ -478,8 +528,12 @@ static int performRequest(char* url, request_t request, void* callback, void* re curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT); curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData); + if (((struct MemoryStruct*)repData)->header) { + curl_easy_setopt(curl, CURLOPT_HEADERDATA, repData); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, WriteHeaderCallback); + } if (request == PUT) { curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");