Hi Alexander,

the ttl specifies the specified time to live for the key in seconds. The discovery_etcd announces itself with a ttl of 30 seconds. If the etcd-watch of the amdatu discovery etcd-implementation (not the endpoint_discovery_poller) takes more time than this, the described behavior occurs. Because the etcd-watch is nothing else than a long poll, the timeout values has to be set accordingly. The celix discovery-etcd implementation currently uses 10 seconds for that.

Regards,
  Bjoern


On 2014-10-07 18:26, Alexander Broekhuis wrote:
Hi Bjoern,

I've updated to trunk, and now when I try to run a Celix -> Amdatu example
using Etcd, it doesn't work as expected.
I haven't investigated this deeply, but when I disable the TTL code (revert the changes from this (and the fix) commit, it works again. So somewhere in
this part something is going on.

What happens is that the service is picked up by Amdatu, is added, but
after some time is removed again. How is the TTL supposed to work? Is this a consequence of the Amdatu poller not polling quick enough wrt TTL set in
Celix?

Thanks,

2014-09-25 6:56 GMT+02:00 <bpe...@apache.org>:

Author: bpetri
Date: Thu Sep 25 04:56:28 2014
New Revision: 1627454

URL: http://svn.apache.org/r1627454
Log:

CELIX-152: add ttl support for etcd


Modified:
    celix/trunk/remote_services/discovery_etcd/private/include/etcd.h
    celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c

Modified: celix/trunk/remote_services/discovery_etcd/private/include/etcd.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd.h?rev=1627454&r1=1627453&r2=1627454&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/etcd.h
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/include/etcd.h Thu
Sep 25 04:56:28 2014
@@ -21,7 +21,7 @@
 bool etcd_init(char* server, int port);
bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
 bool etcd_getNodes(char* directory, char** nodeNames, int* size);
-bool etcd_set(char* key, char* value);
+bool etcd_set(char* key, char* value, int ttl);
 bool etcd_del(char* key);
 bool etcd_watch(char* key, int index, char* action, char* prevValue,
char* value);


Modified: celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1627454&r1=1627453&r2=1627454&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Thu Sep
25 04:56:28 2014
@@ -5,16 +5,17 @@
 #include <curl/curl.h>
 #include <jansson.h>

-
-
 #include "etcd.h"

+#define DEFAULT_CURL_TIMEOUT          10
+#define DEFAULT_CURL_CONECTTIMEOUT    10
+
 typedef enum {
        GET, PUT, DELETE
 } request_t;

 static char* etcd_server = NULL;
-static int etcd_port = NULL;
+static int etcd_port = 0;

 struct MemoryStruct {
        char *memory;
@@ -44,8 +45,8 @@ static int performRequest(char* url, req
        CURLcode res = 0;

        curl = curl_easy_init();
-       curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
-       curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10);
+       curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
+       curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT,
DEFAULT_CURL_CONECTTIMEOUT);
        curl_easy_setopt(curl, CURLOPT_URL, url);
        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
@@ -175,7 +176,7 @@ bool etcd_getNodes(char* directory, char
 }

 //set
-bool etcd_set(char* key, char* value) {
+bool etcd_set(char* key, char* value, int ttl) {
        json_error_t error;
        json_t* js_root;
        json_t* js_node;
@@ -190,7 +191,11 @@ bool etcd_set(char* key, char* value) {
        reply.size = 0; /* no data at this point */

        snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s";,
etcd_server, etcd_port, key);
-       snprintf(request, MAX_CONTENT_LENGTH, "value=%s", value);
+
+       if (ttl > 0)
+           snprintf(request, MAX_CONTENT_LENGTH, "value=%s", value);
+       else
+           snprintf(request, MAX_CONTENT_LENGTH, "value=%s;ttl=%d",
value, ttl);

        res = performRequest(url, PUT, WriteMemoryCallback, request,
(void*) &reply);
        if (res == CURLE_OPERATION_TIMEDOUT) {

Modified:
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1627454&r1=1627453&r2=1627454&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
Thu Sep 25 04:56:28 2014
@@ -53,6 +53,10 @@ struct etcd_watcher {
 #define CFG_ETCD_SERVER_PORT   "DISCOVERY_ETCD_SERVER_PORT"
 #define DEFAULT_ETCD_SERVER_PORT 4001

+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+

 // note that the rootNode shouldn't have a leading slash
 static celix_status_t etcdWatcher_getRootPath(char* rootNode) {
@@ -132,6 +136,49 @@ static celix_status_t etcdWatcher_addAlr
        return status;
 }

+
+static celix_status_t etcdWatcher_addOwnFramework(bundle_context_pt
context)
+{
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+    char localNodePath[MAX_LOCALNODE_LENGTH];
+    char* endpoints = NULL;
+    char* ttlStr = NULL;
+    int ttl;
+
+    // register own framework
+    if ((status = etcdWatcher_getLocalNodePath(context,
&localNodePath[0])) != CELIX_SUCCESS) {
+        return status;
+    }
+
+    if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS,
&endpoints) != CELIX_SUCCESS) || !endpoints) {
+        endpoints = DEFAULT_POLL_ENDPOINTS;
+    }
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) !=
CELIX_SUCCESS) || !ttlStr) {
+        ttl = DEFAULT_ETCD_TTL;
+    }
+    else
+    {
+        char* endptr = ttlStr;
+        errno = 0;
+        ttl =  strtol(ttlStr, &endptr, 10);
+        if (*endptr || errno != 0) {
+            ttl = DEFAULT_ETCD_TTL;
+        }
+    }
+
+    if (etcd_set(localNodePath, endpoints, ttl) == false)
+    {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local
discovery");
+    }
+    else
+    {
+        status = CELIX_SUCCESS;
+    }
+
+    return status;
+}
+
 /*
  * performs (blocking) etcd_watch calls to check for
  * changing discovery endpoint information within etcd.
@@ -161,6 +208,8 @@ static void* etcdWatcher_run(void* data)
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO,
"Unexpected action: %s", action);
                        }
                }
+               // update own framework uuid in any case;
+           etcdWatcher_addOwnFramework(watcher->context);
        }

        return NULL;
@@ -174,11 +223,10 @@ celix_status_t etcdWatcher_create(endpoi
                etcd_watcher_pt *watcher)
 {
        celix_status_t status = CELIX_SUCCESS;
-       char localNodePath[MAX_LOCALNODE_LENGTH];
+
        char* etcd_server = NULL;
        char* etcd_port_string = NULL;
-       int etcd_port = NULL;
-       char* endpoints = NULL;
+       int etcd_port = 0;

        if (poller == NULL) {
                return CELIX_BUNDLE_EXCEPTION;
@@ -211,9 +259,6 @@ celix_status_t etcdWatcher_create(endpoi
                }
        }

- if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS,
&endpoints) != CELIX_SUCCESS) || !endpoints) {
-               endpoints = DEFAULT_POLL_ENDPOINTS;
-       }


        if (etcd_init(etcd_server, etcd_port) == false)
@@ -221,17 +266,7 @@ celix_status_t etcdWatcher_create(endpoi
                return CELIX_BUNDLE_EXCEPTION;
        }

-       // register own framework
-       if ((status = etcdWatcher_getLocalNodePath(context,
&localNodePath[0])) != CELIX_SUCCESS) {
-               return status;
-       }
-
-       if (etcd_set(localNodePath, endpoints) == false)
-       {
-               fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot
register local discovery");
-       }
-
-
+       etcdWatcher_addOwnFramework(context);

if ((status = celixThreadMutex_create(&(*watcher)->watcherLock,
NULL)) != CELIX_SUCCESS) {
                return status;



Reply via email to