Hi Alexander,

thanks for testing - just committed the fix. Allocation on the heap is needed as the discovery_etcd needs to pick up the port for its annunciation.

Regards,
  Bjoern



On 2014-10-17 16:50, Alexander Broekhuis wrote:
Hi Bjoern,

This change causes a crash when shutting down the application. More
specific, if the initial port can be used, the port is not allocated, but once the webserver does not start, an allocation is done. So the free in endpointDiscoveryServer_destroy is ok in the latter, but not needed in the
former case.

Is there a need to allocate something? It could just be done on stack as
far as I can tell.


2014-10-17 9:05 GMT-04:00 <[email protected]>:

Author: bpetri
Date: Fri Oct 17 13:05:20 2014
New Revision: 1632567

URL: http://svn.apache.org/r1632567
Log:
CELIX-169: Add port-collision auto-correction to discovery

Modified:
    celix/trunk/remote_services/discovery/private/include/discovery.h

celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h

celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c

celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h

celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h

celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
    celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c

Modified: celix/trunk/remote_services/discovery/private/include/discovery.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/discovery.h?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery/private/include/discovery.h
(original)
+++ celix/trunk/remote_services/discovery/private/include/discovery.h Fri
Oct 17 13:05:20 2014
@@ -33,6 +33,8 @@
 #include "endpoint_description.h"
 #include "endpoint_listener.h"

+#define DISCOVERY_SERVER_INTERFACE "DISCOVERY_CFG_SERVER_INTERFACE"
+#define DISCOVERY_SERVER_IP            "DISCOVERY_CFG_SERVER_IP"
 #define DISCOVERY_SERVER_PORT          "DISCOVERY_CFG_SERVER_PORT"
 #define DISCOVERY_SERVER_PATH          "DISCOVERY_CFG_SERVER_PATH"
 #define DISCOVERY_POLL_ENDPOINTS       "DISCOVERY_CFG_POLL_ENDPOINTS"

Modified:
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
---
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
(original)
+++
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
Fri Oct 17 13:05:20 2014
@@ -68,4 +68,14 @@ celix_status_t endpointDiscoveryServer_a
  */
 celix_status_t endpointDiscoveryServer_removeEndpoint(
endpoint_discovery_server_pt server, endpoint_description_pt endpoint);

+/**
+ * Removes the url, which is used by the discovery server to announce the
endpoints
+ *
+ * @param server [in] the endpoint discovery server to retrieve the url
from
+ * @param url [out] url which is used to announce the endpoints.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t
endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, char*
url);
+
+
 #endif /* ENDPOINT_DISCOVERY_SERVER_H_ */

Modified:
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
---
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
(original)
+++
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
Fri Oct 17 13:05:20 2014
@@ -25,7 +25,10 @@
  */
 #include <stdlib.h>
 #include <stdint.h>
-
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <ifaddrs.h>
 #include "civetweb.h"
 #include "celix_errno.h"
 #include "utils.h"
@@ -36,7 +39,8 @@
 #include "endpoint_descriptor_writer.h"
 #include "endpoint_discovery_server.h"

-
+// defines how often the webserver is restarted (with an increased port
number)
+#define MAX_NUMBER_OF_RESTARTS         5
 #define DEFAULT_SERVER_THREADS "1"

 #define CIVETWEB_REQUEST_NOT_HANDLED 0
@@ -54,16 +58,23 @@ struct endpoint_discovery_server {
     celix_thread_mutex_t serverLock;

     const char* path;
+    const char *port;
+    const char* ip;
     struct mg_context* ctx;
 };

 // Forward declarations...
static int endpointDiscoveryServer_callback(struct mg_connection *conn);
 static char* format_path(char* path);
+static celix_status_t endpointDiscoveryServer_getIpAdress(char*
interface, char** ip);

 celix_status_t endpointDiscoveryServer_create(discovery_pt discovery,
bundle_context_pt context, endpoint_discovery_server_pt *server) {
        celix_status_t status = CELIX_SUCCESS;

+       char *port = 0;
+       char *ip = NULL;
+       char *path = NULL;
+
        *server = malloc(sizeof(struct endpoint_discovery_server));
        if (!*server) {
                return CELIX_ENOMEM;
@@ -79,13 +90,34 @@ celix_status_t endpointDiscoveryServer_c
                return CELIX_BUNDLE_EXCEPTION;
        }

-       char *port = NULL;
+       bundleContext_getProperty(context, DISCOVERY_SERVER_IP, &ip);
+       if (ip == NULL) {
+               char *interface = NULL;
+
+               bundleContext_getProperty(context,
DISCOVERY_SERVER_INTERFACE, &interface);
+               if ((interface != NULL) &&
(endpointDiscoveryServer_getIpAdress(interface, &ip) != CELIX_SUCCESS)) { + fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Could
not retrieve IP adress for interface %s", interface);
+               }
+
+               if (ip == NULL) {
+ endpointDiscoveryServer_getIpAdress(NULL, &ip);
+               }
+       }
+
+       if (ip != NULL) {
+               fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Using %s for
service annunciation", ip);
+               (*server)->ip = strdup(ip);
+       }
+       else {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "No IP address
for service annunciation set. Using %s", DEFAULT_SERVER_IP);
+               (*server)->ip = (char*) DEFAULT_SERVER_IP;
+       }
+
bundleContext_getProperty(context, DISCOVERY_SERVER_PORT, &port);
        if (port == NULL) {
                port = DEFAULT_SERVER_PORT;
        }

-       char *path = NULL;
bundleContext_getProperty(context, DISCOVERY_SERVER_PATH, &path);
        if (path == NULL) {
                path = DEFAULT_SERVER_PATH;
@@ -93,19 +125,56 @@ celix_status_t endpointDiscoveryServer_c

        (*server)->path = format_path(path);

-       const char *options[] = {
-               "listening_ports", port,
-               "num_threads", DEFAULT_SERVER_THREADS,
-               NULL
-       };
-
        const struct mg_callbacks callbacks = {
                .begin_request = endpointDiscoveryServer_callback,
        };

-       (*server)->ctx = mg_start(&callbacks, (*server), options);
+       unsigned int port_counter = 0;
+
+       do {
+               const char *options[] = {
+                       "listening_ports", port,
+                       "num_threads", DEFAULT_SERVER_THREADS,
+                       NULL
+               };
+
+ (*server)->ctx = mg_start(&callbacks, (*server), options);
+
+               if ((*server)->ctx != NULL)
+               {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting
discovery server on port %s...", port);
+                       (*server)->port = port;
+               }
+               else {
+                       errno = 0;
+                       char* newPort = calloc(10, sizeof(*newPort));
+               char* endptr = port;
+               int currentPort = strtol(port, &endptr, 10);
+
+               if (*endptr || errno != 0) {
+ currentPort = strtol(DEFAULT_SERVER_PORT, NULL, 10);
+               }
+
+               port_counter++;
+                       snprintf(newPort, 6,  "%d", (currentPort+1));

- fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery server
on port %s...", port);
+ fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Error while starting discovery server on port %s - retrying on port %s...", port,
newPort);
+                       port = newPort;
+               }
+
+       } while(((*server)->ctx == NULL) && (port_counter <
MAX_NUMBER_OF_RESTARTS));
+
+       return status;
+}
+
+celix_status_t
endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, char*
url)
+{
+       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+       if (server->ip && server->port && server->path) {
+ sprintf(url, "http://%s:%s/%s";, server->ip, server->port,
server->path);
+               status = CELIX_SUCCESS;
+       }

        return status;
 }
@@ -127,6 +196,9 @@ celix_status_t endpointDiscoveryServer_d
        status = celixThreadMutex_destroy(&server->serverLock);

        free((void*) server->path);
+       free((void*) server->port);
+       free((void*) server->ip);
+
        free(server);

        return status;
@@ -308,3 +380,34 @@ static int endpointDiscoveryServer_callb

        return status;
 }
+
+static celix_status_t endpointDiscoveryServer_getIpAdress(char*
interface, char** ip) {
+       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+       struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+ for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS;
ifa = ifa->ifa_next)
+               {
+                       if (ifa->ifa_addr == NULL)
+                               continue;
+
+                       if ((getnameinfo(ifa->ifa_addr,sizeof(struct
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) &&
(ifa->ifa_addr->sa_family == AF_INET)) {
+                               if (interface == NULL) {
+                                       *ip = strdup(host);
+                                       status = CELIX_SUCCESS;
+                               }
+ else if (strcmp(ifa->ifa_name, interface)
== 0) {
+                                       *ip = strdup(host);
+                                       status = CELIX_SUCCESS;
+                               }
+                       }
+               }
+
+               freeifaddrs(ifaddr);
+    }
+
+    return status;
+}

Modified:
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
---
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
(original)
+++
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
Fri Oct 17 13:05:20 2014
@@ -37,7 +37,7 @@
 #include "endpoint_discovery_server.h"


-
+#define DEFAULT_SERVER_IP      "127.0.0.1"
 #define DEFAULT_SERVER_PORT "9999"
 #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.configured"
 #define DEFAULT_POLL_ENDPOINTS "
http://localhost:9999/org.apache.celix.discovery.configured";

Modified:
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
---
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
(original)
+++
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
Fri Oct 17 13:05:20 2014
@@ -38,9 +38,11 @@
 #include "etcd_watcher.h"


+#define DEFAULT_SERVER_IP      "127.0.0.1"
 #define DEFAULT_SERVER_PORT "9999"
 #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.etcd"
-#define DEFAULT_POLL_ENDPOINTS "
http://localhost:9999/org.apache.celix.discovery.etcd";
+
+#define DEFAULT_POLL_ENDPOINTS ""

 #define MAX_ROOTNODE_LENGTH             64
 #define MAX_LOCALNODE_LENGTH   256

Modified:
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
---
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
(original)
+++
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
Fri Oct 17 13:05:20 2014
@@ -33,7 +33,7 @@

 typedef struct etcd_watcher *etcd_watcher_pt;

-celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt poller,
bundle_context_pt context, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_create(discovery_pt discovery,
bundle_context_pt context, etcd_watcher_pt *watcher);
 celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);



Modified: celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Fri Oct
17 13:05:20 2014
@@ -103,7 +103,7 @@ bool etcd_get(char* key, char* value, ch
        } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
                printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node object\n"); + printf("error while retrieving expected node object %s\n",
json_dumps(js_root, 0));
} else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE))
== NULL) || ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) ==
NULL) || ((js_modifiedIndex = json_object_get(js_node,
ETCD_JSON_MODIFIEDINDEX)) == NULL)) {
                printf("error while retrieving expected objects\n");
        }
@@ -208,7 +208,7 @@ bool etcd_set(char* key, char* value, in
        } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
                printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node object\n"); + printf("error while retrieving expected node object %s\n",
json_dumps(js_root, 0));
} else if ((js_value = json_object_get(js_node, ETCD_JSON_VALUE))
== NULL) {
printf("error while retrieving expected value object\n");
        } else if (json_is_string(js_value)) {
@@ -247,7 +247,7 @@ bool etcd_del(char* key) {
        } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
                printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node object\n"); + printf("error while retrieving expected node object %s\n",
json_dumps(js_root, 0));
        } else {
                retVal = true;
        }
@@ -277,10 +277,10 @@ bool etcd_watch(char* key, int index, ch
        reply.size = 0; /* no data at this point */

        if (index != 0)
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&waitIndex=%d";,
etcd_server, etcd_port, key,
+ snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d";,
etcd_server, etcd_port, key,
                                index);
        else
- snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true";,
etcd_server, etcd_port, key);
+ snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true";,
etcd_server, etcd_port, key);

res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*)
&reply);

@@ -290,18 +290,24 @@ bool etcd_watch(char* key, int index, ch
                printf("error while performing curl w/ %s\n", url);
        } else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
                printf("error while parsing json data\n");
-       } else if (((js_action = json_object_get(js_root,
ETCD_JSON_ACTION)) == NULL) ||
-                       ((js_node = json_object_get(js_root,
ETCD_JSON_NODE)) == NULL) ||
-                       ((js_prevNode = json_object_get(js_root,
ETCD_JSON_PREVNODE)) == NULL)) {
- printf("error while retrieving expected node object\n"); - } else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE))
== NULL) ||
-                       ((js_prevValue = json_object_get(js_prevNode,
ETCD_JSON_VALUE)) == NULL)) {
- printf("error while retrieving expected value objects\n");
-       } else if (json_is_string(js_value) &&
json_is_string(js_prevValue) && json_is_string(js_action)) {
-               strncpy(value, json_string_value(js_value),
MAX_VALUE_LENGTH);
-               strncpy(prevValue, json_string_value(js_prevValue),
MAX_VALUE_LENGTH);
-               strncpy(action, json_string_value(js_action),
MAX_ACTION_LENGTH);
-               retVal = true;
+       } else {
+ js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+               js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+
+               if (js_action == NULL || js_node == NULL) {
+                       printf("error while retrieving expected node
object %s\n", json_dumps(js_root, 0));
+               } else if ((js_value = json_object_get(js_node,
ETCD_JSON_VALUE)) == NULL) {
+                       printf("error while retrieving expected value
objects\n");
+               }
+               else if (json_is_string(js_value) &&
json_is_string(js_action)) {
+                       if ((js_prevNode != NULL) && ((js_prevValue =
json_object_get(js_prevNode, ETCD_JSON_VALUE)) != NULL) &&
(json_is_string(js_prevValue))) {
+                               strncpy(prevValue,
json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+                       }
+                       strncpy(value, json_string_value(js_value),
MAX_VALUE_LENGTH);
+                       strncpy(action, json_string_value(js_action),
MAX_ACTION_LENGTH);
+                       retVal = true;
+               }
        }

        if (reply.memory) {

Modified:
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1632567&r1=1632566&r2=1632567&view=diff

==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
Fri Oct 17 13:05:20 2014
@@ -145,25 +145,31 @@ static celix_status_t etcdWatcher_addAlr
 }


-static celix_status_t etcdWatcher_addOwnFramework(bundle_context_pt
context)
+static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
 {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;
     char localNodePath[MAX_LOCALNODE_LENGTH];
     char value[MAX_VALUE_LENGTH];
     char action[MAX_VALUE_LENGTH];
+       char url[MAX_VALUE_LENGTH];
     int modIndex;
     char* endpoints = NULL;
     char* ttlStr = NULL;
     int ttl;

+       bundle_context_pt context = watcher->discovery->context;
+ endpoint_discovery_server_pt server = watcher->discovery->server;
+
     // register own framework
     if ((status = etcdWatcher_getLocalNodePath(context,
&localNodePath[0])) != CELIX_SUCCESS) {
         return status;
     }

-    if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS,
&endpoints) != CELIX_SUCCESS) || !endpoints) {
-        endpoints = DEFAULT_POLL_ENDPOINTS;
-    }
+       if (endpointDiscoveryServer_getUrl(server, &url[0]) !=
CELIX_SUCCESS) {
+               snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s";,
DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
+       }
+
+       endpoints = &url[0];

     if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) !=
CELIX_SUCCESS) || !ttlStr) {
         ttl = DEFAULT_ETCD_TTL;
@@ -212,19 +218,21 @@ static void* etcdWatcher_run(void* data)
                char preValue[MAX_VALUE_LENGTH];
                char action[MAX_ACTION_LENGTH];

- if (etcd_watch(rootPath, highestModified + 1, &action[0],
&preValue[0], &value[0]) == true) {
+               if (etcd_watch(rootPath, 0, &action[0], &preValue[0],
&value[0]) == true) {

                        if (strcmp(action, "set") == 0) {
-
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);

endpointDiscoveryPoller_addDiscoveryEndpoint(poller, &value[0]);
                        } else if (strcmp(action, "delete") == 0) {

endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+                       } else if (strcmp(action, "update") == 0) {
+                               // TODO
                        } else {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO,
"Unexpected action: %s", action);
                        }
                }
+
                // update own framework uuid in any case;
-           etcdWatcher_addOwnFramework(context);
+           etcdWatcher_addOwnFramework(watcher);
        }

        return NULL;
@@ -278,7 +286,7 @@ celix_status_t etcdWatcher_create(discov
                return CELIX_BUNDLE_EXCEPTION;
        }

-       etcdWatcher_addOwnFramework(context);
+       etcdWatcher_addOwnFramework((*watcher));

if ((status = celixThreadMutex_create(&(*watcher)->watcherLock,
NULL)) != CELIX_SUCCESS) {
                return status;



Reply via email to