Author: bpetri
Date: Fri Oct 17 13:05:20 2014
New Revision: 1632567
URL: http://svn.apache.org/r1632567
Log:
CELIX-169: Add port-collision auto-correction to discovery
Modified:
celix/trunk/remote_services/discovery/private/include/discovery.h
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
Modified:
celix/trunk/remote_services/discovery/private/include/discovery.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/discovery.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/discovery.h
(original)
+++ celix/trunk/remote_services/discovery/private/include/discovery.h
Fri
Oct 17 13:05:20 2014
@@ -33,6 +33,8 @@
#include "endpoint_description.h"
#include "endpoint_listener.h"
+#define DISCOVERY_SERVER_INTERFACE
"DISCOVERY_CFG_SERVER_INTERFACE"
+#define DISCOVERY_SERVER_IP "DISCOVERY_CFG_SERVER_IP"
#define DISCOVERY_SERVER_PORT "DISCOVERY_CFG_SERVER_PORT"
#define DISCOVERY_SERVER_PATH "DISCOVERY_CFG_SERVER_PATH"
#define DISCOVERY_POLL_ENDPOINTS "DISCOVERY_CFG_POLL_ENDPOINTS"
Modified:
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
(original)
+++
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
Fri Oct 17 13:05:20 2014
@@ -68,4 +68,14 @@ celix_status_t endpointDiscoveryServer_a
*/
celix_status_t endpointDiscoveryServer_removeEndpoint(
endpoint_discovery_server_pt server, endpoint_description_pt
endpoint);
+/**
+ * Removes the url, which is used by the discovery server to announce
the
endpoints
+ *
+ * @param server [in] the endpoint discovery server to retrieve the
url
from
+ * @param url [out] url which is used to announce the endpoints.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t
endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server,
char*
url);
+
+
#endif /* ENDPOINT_DISCOVERY_SERVER_H_ */
Modified:
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
(original)
+++
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
Fri Oct 17 13:05:20 2014
@@ -25,7 +25,10 @@
*/
#include <stdlib.h>
#include <stdint.h>
-
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <ifaddrs.h>
#include "civetweb.h"
#include "celix_errno.h"
#include "utils.h"
@@ -36,7 +39,8 @@
#include "endpoint_descriptor_writer.h"
#include "endpoint_discovery_server.h"
-
+// defines how often the webserver is restarted (with an increased
port
number)
+#define MAX_NUMBER_OF_RESTARTS 5
#define DEFAULT_SERVER_THREADS "1"
#define CIVETWEB_REQUEST_NOT_HANDLED 0
@@ -54,16 +58,23 @@ struct endpoint_discovery_server {
celix_thread_mutex_t serverLock;
const char* path;
+ const char *port;
+ const char* ip;
struct mg_context* ctx;
};
// Forward declarations...
static int endpointDiscoveryServer_callback(struct mg_connection
*conn);
static char* format_path(char* path);
+static celix_status_t endpointDiscoveryServer_getIpAdress(char*
interface, char** ip);
celix_status_t endpointDiscoveryServer_create(discovery_pt discovery,
bundle_context_pt context, endpoint_discovery_server_pt *server) {
celix_status_t status = CELIX_SUCCESS;
+ char *port = 0;
+ char *ip = NULL;
+ char *path = NULL;
+
*server = malloc(sizeof(struct endpoint_discovery_server));
if (!*server) {
return CELIX_ENOMEM;
@@ -79,13 +90,34 @@ celix_status_t endpointDiscoveryServer_c
return CELIX_BUNDLE_EXCEPTION;
}
- char *port = NULL;
+ bundleContext_getProperty(context, DISCOVERY_SERVER_IP, &ip);
+ if (ip == NULL) {
+ char *interface = NULL;
+
+ bundleContext_getProperty(context,
DISCOVERY_SERVER_INTERFACE, &interface);
+ if ((interface != NULL) &&
(endpointDiscoveryServer_getIpAdress(interface, &ip) !=
CELIX_SUCCESS)) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING,
"Could
not retrieve IP adress for interface %s", interface);
+ }
+
+ if (ip == NULL) {
+ endpointDiscoveryServer_getIpAdress(NULL,
&ip);
+ }
+ }
+
+ if (ip != NULL) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Using %s for
service annunciation", ip);
+ (*server)->ip = strdup(ip);
+ }
+ else {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "No IP
address
for service annunciation set. Using %s", DEFAULT_SERVER_IP);
+ (*server)->ip = (char*) DEFAULT_SERVER_IP;
+ }
+
bundleContext_getProperty(context, DISCOVERY_SERVER_PORT,
&port);
if (port == NULL) {
port = DEFAULT_SERVER_PORT;
}
- char *path = NULL;
bundleContext_getProperty(context, DISCOVERY_SERVER_PATH,
&path);
if (path == NULL) {
path = DEFAULT_SERVER_PATH;
@@ -93,19 +125,56 @@ celix_status_t endpointDiscoveryServer_c
(*server)->path = format_path(path);
- const char *options[] = {
- "listening_ports", port,
- "num_threads", DEFAULT_SERVER_THREADS,
- NULL
- };
-
const struct mg_callbacks callbacks = {
.begin_request = endpointDiscoveryServer_callback,
};
- (*server)->ctx = mg_start(&callbacks, (*server), options);
+ unsigned int port_counter = 0;
+
+ do {
+ const char *options[] = {
+ "listening_ports", port,
+ "num_threads", DEFAULT_SERVER_THREADS,
+ NULL
+ };
+
+ (*server)->ctx = mg_start(&callbacks, (*server),
options);
+
+ if ((*server)->ctx != NULL)
+ {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO,
"Starting
discovery server on port %s...", port);
+ (*server)->port = port;
+ }
+ else {
+ errno = 0;
+ char* newPort = calloc(10, sizeof(*newPort));
+ char* endptr = port;
+ int currentPort = strtol(port, &endptr, 10);
+
+ if (*endptr || errno != 0) {
+ currentPort = strtol(DEFAULT_SERVER_PORT, NULL,
10);
+ }
+
+ port_counter++;
+ snprintf(newPort, 6, "%d", (currentPort+1));
- fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery
server
on port %s...", port);
+ fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR,
"Error
while starting discovery server on port %s - retrying on port %s...",
port,
newPort);
+ port = newPort;
+ }
+
+ } while(((*server)->ctx == NULL) && (port_counter <
MAX_NUMBER_OF_RESTARTS));
+
+ return status;
+}
+
+celix_status_t
endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server,
char*
url)
+{
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ if (server->ip && server->port && server->path) {
+ sprintf(url, "http://%s:%s/%s", server->ip,
server->port,
server->path);
+ status = CELIX_SUCCESS;
+ }
return status;
}
@@ -127,6 +196,9 @@ celix_status_t endpointDiscoveryServer_d
status = celixThreadMutex_destroy(&server->serverLock);
free((void*) server->path);
+ free((void*) server->port);
+ free((void*) server->ip);
+
free(server);
return status;
@@ -308,3 +380,34 @@ static int endpointDiscoveryServer_callb
return status;
}
+
+static celix_status_t endpointDiscoveryServer_getIpAdress(char*
interface, char** ip) {
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ struct ifaddrs *ifaddr, *ifa;
+ char host[NI_MAXHOST];
+
+ if (getifaddrs(&ifaddr) != -1)
+ {
+ for (ifa = ifaddr; ifa != NULL && status !=
CELIX_SUCCESS;
ifa = ifa->ifa_next)
+ {
+ if (ifa->ifa_addr == NULL)
+ continue;
+
+ if ((getnameinfo(ifa->ifa_addr,sizeof(struct
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) &&
(ifa->ifa_addr->sa_family == AF_INET)) {
+ if (interface == NULL) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ else if (strcmp(ifa->ifa_name,
interface)
== 0) {
+ *ip = strdup(host);
+ status = CELIX_SUCCESS;
+ }
+ }
+ }
+
+ freeifaddrs(ifaddr);
+ }
+
+ return status;
+}
Modified:
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
(original)
+++
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
Fri Oct 17 13:05:20 2014
@@ -37,7 +37,7 @@
#include "endpoint_discovery_server.h"
-
+#define DEFAULT_SERVER_IP "127.0.0.1"
#define DEFAULT_SERVER_PORT "9999"
#define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.configured"
#define DEFAULT_POLL_ENDPOINTS "
http://localhost:9999/org.apache.celix.discovery.configured"
Modified:
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
(original)
+++
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
Fri Oct 17 13:05:20 2014
@@ -38,9 +38,11 @@
#include "etcd_watcher.h"
+#define DEFAULT_SERVER_IP "127.0.0.1"
#define DEFAULT_SERVER_PORT "9999"
#define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.etcd"
-#define DEFAULT_POLL_ENDPOINTS "
http://localhost:9999/org.apache.celix.discovery.etcd"
+
+#define DEFAULT_POLL_ENDPOINTS ""
#define MAX_ROOTNODE_LENGTH 64
#define MAX_LOCALNODE_LENGTH 256
Modified:
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
(original)
+++
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
Fri Oct 17 13:05:20 2014
@@ -33,7 +33,7 @@
typedef struct etcd_watcher *etcd_watcher_pt;
-celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt
poller,
bundle_context_pt context, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_create(discovery_pt discovery,
bundle_context_pt context, etcd_watcher_pt *watcher);
celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
Modified:
celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
(original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Fri
Oct
17 13:05:20 2014
@@ -103,7 +103,7 @@ bool etcd_get(char* key, char* value, ch
} else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root,
ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node
object\n");
+ printf("error while retrieving expected node object
%s\n",
json_dumps(js_root, 0));
} else if (((js_value = json_object_get(js_node,
ETCD_JSON_VALUE))
== NULL) || ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) ==
NULL) || ((js_modifiedIndex = json_object_get(js_node,
ETCD_JSON_MODIFIEDINDEX)) == NULL)) {
printf("error while retrieving expected objects\n");
}
@@ -208,7 +208,7 @@ bool etcd_set(char* key, char* value, in
} else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root,
ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node
object\n");
+ printf("error while retrieving expected node object
%s\n",
json_dumps(js_root, 0));
} else if ((js_value = json_object_get(js_node,
ETCD_JSON_VALUE))
== NULL) {
printf("error while retrieving expected value
object\n");
} else if (json_is_string(js_value)) {
@@ -247,7 +247,7 @@ bool etcd_del(char* key) {
} else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
printf("error while parsing json data\n");
} else if ((js_node = json_object_get(js_root,
ETCD_JSON_NODE)) ==
NULL) {
- printf("error while retrieving expected node
object\n");
+ printf("error while retrieving expected node object
%s\n",
json_dumps(js_root, 0));
} else {
retVal = true;
}
@@ -277,10 +277,10 @@ bool etcd_watch(char* key, int index, ch
reply.size = 0; /* no data at this point */
if (index != 0)
- snprintf(url, MAX_URL_LENGTH,
"http://%s:%d/v2/keys/%s?wait=true&waitIndex=%d",
etcd_server, etcd_port, key,
+ snprintf(url, MAX_URL_LENGTH,
"http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d",
etcd_server, etcd_port, key,
index);
else
- snprintf(url, MAX_URL_LENGTH,
"http://%s:%d/v2/keys/%s?wait=true",
etcd_server, etcd_port, key);
+ snprintf(url, MAX_URL_LENGTH,
"http://%s:%d/v2/keys/%s?wait=true&recursive=true",
etcd_server, etcd_port, key);
res = performRequest(url, GET, WriteMemoryCallback, NULL,
(void*)
&reply);
@@ -290,18 +290,24 @@ bool etcd_watch(char* key, int index, ch
printf("error while performing curl w/ %s\n", url);
} else if ((js_root = json_loads(reply.memory, 0, &error)) ==
NULL) {
printf("error while parsing json data\n");
- } else if (((js_action = json_object_get(js_root,
ETCD_JSON_ACTION)) == NULL) ||
- ((js_node = json_object_get(js_root,
ETCD_JSON_NODE)) == NULL) ||
- ((js_prevNode = json_object_get(js_root,
ETCD_JSON_PREVNODE)) == NULL)) {
- printf("error while retrieving expected node
object\n");
- } else if (((js_value = json_object_get(js_node,
ETCD_JSON_VALUE))
== NULL) ||
- ((js_prevValue = json_object_get(js_prevNode,
ETCD_JSON_VALUE)) == NULL)) {
- printf("error while retrieving expected value
objects\n");
- } else if (json_is_string(js_value) &&
json_is_string(js_prevValue) && json_is_string(js_action)) {
- strncpy(value, json_string_value(js_value),
MAX_VALUE_LENGTH);
- strncpy(prevValue, json_string_value(js_prevValue),
MAX_VALUE_LENGTH);
- strncpy(action, json_string_value(js_action),
MAX_ACTION_LENGTH);
- retVal = true;
+ } else {
+ js_action = json_object_get(js_root,
ETCD_JSON_ACTION);
+ js_node = json_object_get(js_root, ETCD_JSON_NODE);
+ js_prevNode = json_object_get(js_root,
ETCD_JSON_PREVNODE);
+
+ if (js_action == NULL || js_node == NULL) {
+ printf("error while retrieving expected node
object %s\n", json_dumps(js_root, 0));
+ } else if ((js_value = json_object_get(js_node,
ETCD_JSON_VALUE)) == NULL) {
+ printf("error while retrieving expected value
objects\n");
+ }
+ else if (json_is_string(js_value) &&
json_is_string(js_action)) {
+ if ((js_prevNode != NULL) && ((js_prevValue =
json_object_get(js_prevNode, ETCD_JSON_VALUE)) != NULL) &&
(json_is_string(js_prevValue))) {
+ strncpy(prevValue,
json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+ }
+ strncpy(value, json_string_value(js_value),
MAX_VALUE_LENGTH);
+ strncpy(action, json_string_value(js_action),
MAX_ACTION_LENGTH);
+ retVal = true;
+ }
}
if (reply.memory) {
Modified:
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
URL:
http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
---
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
(original)
+++
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
Fri Oct 17 13:05:20 2014
@@ -145,25 +145,31 @@ static celix_status_t etcdWatcher_addAlr
}
-static celix_status_t etcdWatcher_addOwnFramework(bundle_context_pt
context)
+static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt
watcher)
{
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
char localNodePath[MAX_LOCALNODE_LENGTH];
char value[MAX_VALUE_LENGTH];
char action[MAX_VALUE_LENGTH];
+ char url[MAX_VALUE_LENGTH];
int modIndex;
char* endpoints = NULL;
char* ttlStr = NULL;
int ttl;
+ bundle_context_pt context = watcher->discovery->context;
+ endpoint_discovery_server_pt server =
watcher->discovery->server;
+
// register own framework
if ((status = etcdWatcher_getLocalNodePath(context,
&localNodePath[0])) != CELIX_SUCCESS) {
return status;
}
- if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS,
&endpoints) != CELIX_SUCCESS) || !endpoints) {
- endpoints = DEFAULT_POLL_ENDPOINTS;
- }
+ if (endpointDiscoveryServer_getUrl(server, &url[0]) !=
CELIX_SUCCESS) {
+ snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s",
DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
+ }
+
+ endpoints = &url[0];
if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) !=
CELIX_SUCCESS) || !ttlStr) {
ttl = DEFAULT_ETCD_TTL;
@@ -212,19 +218,21 @@ static void* etcdWatcher_run(void* data)
char preValue[MAX_VALUE_LENGTH];
char action[MAX_ACTION_LENGTH];
- if (etcd_watch(rootPath, highestModified + 1,
&action[0],
&preValue[0], &value[0]) == true) {
+ if (etcd_watch(rootPath, 0, &action[0], &preValue[0],
&value[0]) == true) {
if (strcmp(action, "set") == 0) {
-
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller,
&preValue[0]);
endpointDiscoveryPoller_addDiscoveryEndpoint(poller, &value[0]);
} else if (strcmp(action, "delete") == 0) {
endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+ } else if (strcmp(action, "update") == 0) {
+ // TODO
} else {
fw_log(logger,
OSGI_FRAMEWORK_LOG_INFO,
"Unexpected action: %s", action);
}
}
+
// update own framework uuid in any case;
- etcdWatcher_addOwnFramework(context);
+ etcdWatcher_addOwnFramework(watcher);
}
return NULL;
@@ -278,7 +286,7 @@ celix_status_t etcdWatcher_create(discov
return CELIX_BUNDLE_EXCEPTION;
}
- etcdWatcher_addOwnFramework(context);
+ etcdWatcher_addOwnFramework((*watcher));
if ((status =
celixThreadMutex_create(&(*watcher)->watcherLock,
NULL)) != CELIX_SUCCESS) {
return status;