http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/CMakeLists.txt b/pubsub/pubsub_discovery/CMakeLists.txt index 948da3e..0e7d6c5 100644 --- a/pubsub/pubsub_discovery/CMakeLists.txt +++ b/pubsub/pubsub_discovery/CMakeLists.txt @@ -18,26 +18,25 @@ find_package(CURL REQUIRED) find_package(Jansson REQUIRED) -include_directories("${CURL_INCLUDE_DIR}") -include_directories("${JANSSON_INCLUDE_DIR}") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub") -include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include") -include_directories("private/include") -include_directories("public/include") - add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd" VERSION "1.0.0" SOURCES - private/src/psd_activator.c - private/src/pubsub_discovery_impl.c - private/src/etcd_common.c - private/src/etcd_watcher.c - private/src/etcd_writer.c + src/psd_activator.c + src/pubsub_discovery_impl.c + src/etcd_common.c + src/etcd_watcher.c + src/etcd_writer.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c ) -target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery celix_framework celix_utils etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES}) +target_include_directories(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE + src + include + ${CURL_INCLUDE_DIR} + ${JANSSON_INCLUDE_DIR} + ) + +target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES}) install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_common.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/include/etcd_common.h b/pubsub/pubsub_discovery/private/include/etcd_common.h deleted file mode 100644 index 7a3e7b6..0000000 --- a/pubsub/pubsub_discovery/private/include/etcd_common.h +++ /dev/null @@ -1,28 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef ETCD_COMMON_H_ -#define ETCD_COMMON_H_ - -#include "bundle_context.h" -#include "celix_errno.h" - -celix_status_t etcdCommon_init(bundle_context_pt context); - -#endif /* ETCD_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_watcher.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/include/etcd_watcher.h b/pubsub/pubsub_discovery/private/include/etcd_watcher.h deleted file mode 100644 index c425e60..0000000 --- a/pubsub/pubsub_discovery/private/include/etcd_watcher.h +++ /dev/null @@ -1,38 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef ETCD_WATCHER_H_ -#define ETCD_WATCHER_H_ - -#include "bundle_context.h" -#include "celix_errno.h" - -#include "pubsub_discovery.h" -#include "pubsub_endpoint.h" - -typedef struct etcd_watcher *etcd_watcher_pt; - -celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery, bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher); -celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher); -celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher); - -celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP); - - -#endif /* ETCD_WATCHER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_writer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/include/etcd_writer.h b/pubsub/pubsub_discovery/private/include/etcd_writer.h deleted file mode 100644 index 3ff98b9..0000000 --- a/pubsub/pubsub_discovery/private/include/etcd_writer.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef ETCD_WRITER_H_ -#define ETCD_WRITER_H_ - -#include "bundle_context.h" -#include "celix_errno.h" - -#include "pubsub_discovery.h" -#include "pubsub_endpoint.h" - -typedef struct etcd_writer *etcd_writer_pt; - - -etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery); -void etcdWriter_destroy(etcd_writer_pt writer); - -celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP); -celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP); - - -#endif /* ETCD_WRITER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h deleted file mode 100644 index 676a6ab..0000000 --- a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h +++ /dev/null @@ -1,72 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef PUBSUB_DISCOVERY_IMPL_H_ -#define PUBSUB_DISCOVERY_IMPL_H_ - -#include "bundle_context.h" -#include "service_reference.h" - -#include "etcd_watcher.h" -#include "etcd_writer.h" -#include "pubsub_endpoint.h" - -#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;} - -struct watcher_info { - etcd_watcher_pt watcher; - int nr_references; -}; - -struct pubsub_discovery { - bundle_context_pt context; - - celix_thread_mutex_t discoveredPubsMutex; - hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>> - - celix_thread_mutex_t listenerReferencesMutex; - hash_map_pt listenerReferences; //key=serviceReference, value=nop - - celix_thread_mutex_t watchersMutex; - hash_map_pt watchers; //key = topicname, value = struct watcher_info - - etcd_writer_pt writer; -}; - - -celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery); -celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery); -celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery); -celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery); - -celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP); - -celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service); - -celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic); -celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic); - -celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt discovery, pubsub_endpoint_pt endpoint, bool endpointAdded); - -#endif /* PUBSUB_DISCOVERY_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_common.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c b/pubsub/pubsub_discovery/private/src/etcd_common.c deleted file mode 100644 index c757801..0000000 --- a/pubsub/pubsub_discovery/private/src/etcd_common.c +++ /dev/null @@ -1,82 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <stdbool.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> - -#include "celix_log.h" -#include "constants.h" - -#include <curl/curl.h> -#include "etcd.h" -#include "etcd_watcher.h" - -#include "pubsub_discovery.h" -#include "pubsub_discovery_impl.h" - - -#define MAX_ROOTNODE_LENGTH 128 -#define MAX_LOCALNODE_LENGTH 4096 -#define MAX_FIELD_LENGTH 128 - -#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" -#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" - -#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" -#define DEFAULT_ETCD_SERVER_PORT 2379 - -// be careful - this should be higher than the curl timeout -#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" -#define DEFAULT_ETCD_TTL 30 - - -celix_status_t etcdCommon_init(bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - const char* etcd_server = NULL; - const char* etcd_port_string = NULL; - int etcd_port = 0; - - if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) { - etcd_server = DEFAULT_ETCD_SERVER_IP; - } - - if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) { - etcd_port = DEFAULT_ETCD_SERVER_PORT; - } else { - char* endptr = NULL; - errno = 0; - etcd_port = strtol(etcd_port_string, &endptr, 10); - if (*endptr || errno != 0) { - etcd_port = DEFAULT_ETCD_SERVER_PORT; - } - } - - printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port); - - if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) { - status = CELIX_BUNDLE_EXCEPTION; - } else { - status = CELIX_SUCCESS; - } - - return status; -} - http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/pubsub/pubsub_discovery/private/src/etcd_watcher.c deleted file mode 100644 index 3c3a5a8..0000000 --- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c +++ /dev/null @@ -1,290 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <stdbool.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> - -#include "celix_log.h" -#include "constants.h" - -#include "etcd.h" -#include "etcd_watcher.h" - -#include "pubsub_discovery.h" -#include "pubsub_discovery_impl.h" - - - -#define MAX_ROOTNODE_LENGTH 128 -#define MAX_LOCALNODE_LENGTH 4096 -#define MAX_FIELD_LENGTH 128 - -#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" -#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" - -#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" -#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" - -#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" -#define DEFAULT_ETCD_SERVER_PORT 2379 - -// be careful - this should be higher than the curl timeout -#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" -#define DEFAULT_ETCD_TTL 30 - - -struct etcd_watcher { - pubsub_discovery_pt pubsub_discovery; - - celix_thread_mutex_t watcherLock; - celix_thread_t watcherThread; - - char *scope; - char *topic; - volatile bool running; -}; - -struct etcd_writer { - pubsub_discovery_pt pubsub_discovery; - celix_thread_mutex_t localPubsLock; - array_list_pt localPubs; - volatile bool running; - celix_thread_t writerThread; -}; - - -// note that the rootNode shouldn't have a leading slash -static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) { - celix_status_t status = CELIX_SUCCESS; - const char* rootPath = NULL; - - if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { - snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic); - } else { - snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic); - } - - return status; -} - -static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) { - celix_status_t status = CELIX_SUCCESS; - const char* rootPath = NULL; - - if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { - strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); - } else { - strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); - } - - return status; -} - - -static void add_node(const char *key, const char *value, void* arg) { - pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg; - pubsub_endpoint_pt pubEP = NULL; - celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP); - if(!status && pubEP) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } -} - -static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) { - celix_status_t status = CELIX_SUCCESS; - if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) { - status = CELIX_ILLEGAL_ARGUMENT; - } - return status; -} - -// gets everything from provided key -celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) { - - celix_status_t status = CELIX_SUCCESS; - - char rootPath[MAX_ROOTNODE_LENGTH]; - char *expr = NULL; - char scope[MAX_FIELD_LENGTH]; - char topic[MAX_FIELD_LENGTH]; - char fwUUID[MAX_FIELD_LENGTH]; - char serviceId[MAX_FIELD_LENGTH]; - - memset(rootPath,0,MAX_ROOTNODE_LENGTH); - memset(topic,0,MAX_FIELD_LENGTH); - memset(fwUUID,0,MAX_FIELD_LENGTH); - memset(serviceId,0,MAX_FIELD_LENGTH); - - etcdWatcher_getRootPath(pubsub_discovery->context, rootPath); - - asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath); - if(expr) { - int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); - free(expr); - if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. - status = CELIX_ILLEGAL_STATE; - } - else{ - status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP); - } - } - return status; -} - -/* - * performs (blocking) etcd_watch calls to check for - * changing discovery endpoint information within etcd. - */ -static void* etcdWatcher_run(void* data) { - etcd_watcher_pt watcher = (etcd_watcher_pt) data; - time_t timeBeforeWatch = time(NULL); - char rootPath[MAX_ROOTNODE_LENGTH]; - long long highestModified = 0; - - pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery; - bundle_context_pt context = ps_discovery->context; - - memset(rootPath, 0, MAX_ROOTNODE_LENGTH); - - //TODO: add topic to etcd key - etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); - etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified); - - while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) { - - char *rkey = NULL; - char *value = NULL; - char *preValue = NULL; - char *action = NULL; - long long modIndex; - - celixThreadMutex_unlock(&watcher->watcherLock); - - if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { - pubsub_endpoint_pt pubEP = NULL; - if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "delete") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_removeNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "expire") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_removeNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "update") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } - } else { - fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action); - } - highestModified = modIndex; - } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { - sleep(DEFAULT_ETCD_TTL / 4); - } - - FREE_MEM(action); - FREE_MEM(value); - FREE_MEM(preValue); - FREE_MEM(rkey); - - /* prevent busy waiting, in case etcd_watch returns false */ - - - if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { - timeBeforeWatch = time(NULL); - } - - } - - if (watcher->running == false) { - celixThreadMutex_unlock(&watcher->watcherLock); - } - - return NULL; -} - -celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) { - celix_status_t status = CELIX_SUCCESS; - - - if (pubsub_discovery == NULL) { - return CELIX_BUNDLE_EXCEPTION; - } - - (*watcher) = calloc(1, sizeof(struct etcd_watcher)); - - if(*watcher == NULL){ - return CELIX_ENOMEM; - } - - (*watcher)->pubsub_discovery = pubsub_discovery; - (*watcher)->scope = strdup(scope); - (*watcher)->topic = strdup(topic); - - - celixThreadMutex_create(&(*watcher)->watcherLock, NULL); - - celixThreadMutex_lock(&(*watcher)->watcherLock); - - status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); - if (status == CELIX_SUCCESS) { - (*watcher)->running = true; - } - - celixThreadMutex_unlock(&(*watcher)->watcherLock); - - - return status; -} - -celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) { - - celix_status_t status = CELIX_SUCCESS; - - char rootPath[MAX_ROOTNODE_LENGTH]; - etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); - celixThreadMutex_destroy(&(watcher->watcherLock)); - - free(watcher->scope); - free(watcher->topic); - free(watcher); - - return status; -} - -celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(watcher->watcherLock)); - watcher->running = false; - celixThreadMutex_unlock(&(watcher->watcherLock)); - - celixThread_join(watcher->watcherThread, NULL); - - return status; - -} - - http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_writer.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c b/pubsub/pubsub_discovery/private/src/etcd_writer.c deleted file mode 100644 index 1c423f3..0000000 --- a/pubsub/pubsub_discovery/private/src/etcd_writer.c +++ /dev/null @@ -1,189 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <stdbool.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> - -#include "celix_log.h" -#include "constants.h" - -#include "etcd.h" -#include "etcd_writer.h" - -#include "pubsub_discovery.h" -#include "pubsub_discovery_impl.h" - -#define MAX_ROOTNODE_LENGTH 128 - -#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" -#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" - -#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" -#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" - -#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" -#define DEFAULT_ETCD_SERVER_PORT 2379 - -// be careful - this should be higher than the curl timeout -#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" -#define DEFAULT_ETCD_TTL 30 - -struct etcd_writer { - pubsub_discovery_pt pubsub_discovery; - celix_thread_mutex_t localPubsLock; - array_list_pt localPubs; - volatile bool running; - celix_thread_t writerThread; -}; - - -static const char* etcdWriter_getRootPath(bundle_context_pt context); -static void* etcdWriter_run(void* data); - - -etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) { - etcd_writer_pt writer = calloc(1, sizeof(*writer)); - if(writer) { - celixThreadMutex_create(&writer->localPubsLock, NULL); - arrayList_create(&writer->localPubs); - writer->pubsub_discovery = disc; - writer->running = true; - celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer); - } - return writer; -} - -void etcdWriter_destroy(etcd_writer_pt writer) { - char dir[MAX_ROOTNODE_LENGTH]; - const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - - writer->running = false; - celixThread_join(writer->writerThread, NULL); - - celixThreadMutex_lock(&writer->localPubsLock); - for(int i = 0; i < arrayList_size(writer->localPubs); i++) { - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i); - memset(dir,0,MAX_ROOTNODE_LENGTH); - snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID); - etcd_del(dir); - pubsubEndpoint_destroy(pubEP); - } - arrayList_destroy(writer->localPubs); - - celixThreadMutex_unlock(&writer->localPubsLock); - celixThreadMutex_destroy(&(writer->localPubsLock)); - - free(writer); -} - -celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){ - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - - if(storeEP){ - const char *fwUUID = NULL; - bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) { - celixThreadMutex_lock(&writer->localPubsLock); - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_clone(pubEP, &p); - arrayList_add(writer->localPubs,p); - celixThreadMutex_unlock(&writer->localPubsLock); - } - } - - char *key; - - const char* ttlStr = NULL; - int ttl = 0; - - // determine ttl - if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) { - ttl = DEFAULT_ETCD_TTL; - } else { - char* endptr = NULL; - errno = 0; - ttl = strtol(ttlStr, &endptr, 10); - if (*endptr || errno != 0) { - ttl = DEFAULT_ETCD_TTL; - } - } - - const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - - asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID); - - if(!etcd_set(key,pubEP->endpoint,ttl,false)){ - status = CELIX_ILLEGAL_ARGUMENT; - } - FREE_MEM(key); - return status; -} - -celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - char *key = NULL; - - const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - - asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID); - - celixThreadMutex_lock(&writer->localPubsLock); - for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { - pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i); - if (pubsubEndpoint_equals(ep, pubEP)) { - arrayList_remove(writer->localPubs, i); - pubsubEndpoint_destroy(ep); - break; - } - } - celixThreadMutex_unlock(&writer->localPubsLock); - - if (etcd_del(key)) { - printf("Failed to remove key %s from ETCD\n",key); - status = CELIX_ILLEGAL_ARGUMENT; - } - FREE_MEM(key); - return status; -} - -static void* etcdWriter_run(void* data) { - etcd_writer_pt writer = (etcd_writer_pt)data; - while(writer->running) { - celixThreadMutex_lock(&writer->localPubsLock); - for(int i=0; i < arrayList_size(writer->localPubs); i++) { - etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false); - } - celixThreadMutex_unlock(&writer->localPubsLock); - sleep(DEFAULT_ETCD_TTL / 2); - } - - return NULL; -} - -static const char* etcdWriter_getRootPath(bundle_context_pt context) { - const char* rootPath = NULL; - bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath); - if(rootPath == NULL) { - rootPath = DEFAULT_ETCD_ROOTPATH; - } - return rootPath; -} - http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/psd_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c b/pubsub/pubsub_discovery/private/src/psd_activator.c deleted file mode 100644 index 89a517d..0000000 --- a/pubsub/pubsub_discovery/private/src/psd_activator.c +++ /dev/null @@ -1,171 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include "bundle_activator.h" -#include "service_tracker.h" -#include "service_registration.h" -#include "constants.h" -#include "celix_log.h" - -#include "pubsub_common.h" -#include "publisher_endpoint_announce.h" -#include "pubsub_discovery.h" -#include "pubsub_discovery_impl.h" - -struct activator { - bundle_context_pt context; - pubsub_discovery_pt pubsub_discovery; - - service_tracker_pt pstmPublishersTracker; - - publisher_endpoint_announce_pt publisherEPAnnounce; - service_registration_pt publisherEPAnnounceService; -}; - -static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) { - celix_status_t status = CELIX_SUCCESS; - - service_tracker_customizer_pt customizer = NULL; - - status = serviceTrackerCustomizer_create(activator->pubsub_discovery, - NULL, - pubsub_discovery_tmPublisherAnnounceAdded, - pubsub_discovery_tmPublisherAnnounceModified, - pubsub_discovery_tmPublisherAnnounceRemoved, - &customizer); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker); - } - - return status; -} - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - - struct activator* activator = calloc(1, sizeof(*activator)); - - if (activator) { - activator->context = context; - activator->pstmPublishersTracker = NULL; - activator->publisherEPAnnounce = NULL; - activator->publisherEPAnnounceService = NULL; - - status = pubsub_discovery_create(context, &activator->pubsub_discovery); - - if (status == CELIX_SUCCESS) { - status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker)); - } - - if (status == CELIX_SUCCESS) { - *userData = activator; - } else { - free(activator); - } - } else { - status = CELIX_ENOMEM; - } - - return status; - -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - - struct activator *activator = userData; - - publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer)); - - if (pubEPAnnouncer) { - - pubEPAnnouncer->handle = activator->pubsub_discovery; - pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher; - pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher; - pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic; - pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic; - activator->publisherEPAnnounce = pubEPAnnouncer; - - properties_pt props = properties_create(); - properties_set(props, "PUBSUB_DISCOVERY", "true"); - - // pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values - status = pubsub_discovery_start(activator->pubsub_discovery); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_open(activator->pstmPublishersTracker); - } - - if (status == CELIX_SUCCESS) { - status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService); - } - - - } - else{ - status = CELIX_ENOMEM; - } - - if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){ - free(pubEPAnnouncer); - } - - - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - status += pubsub_discovery_stop(activator->pubsub_discovery); - - status += serviceTracker_close(activator->pstmPublishersTracker); - - status += serviceRegistration_unregister(activator->publisherEPAnnounceService); - - if (status == CELIX_SUCCESS) { - free(activator->publisherEPAnnounce); - } - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - status += serviceTracker_destroy(activator->pstmPublishersTracker); - status += pubsub_discovery_destroy(activator->pubsub_discovery); - - activator->publisherEPAnnounce = NULL; - activator->publisherEPAnnounceService = NULL; - activator->pstmPublishersTracker = NULL; - activator->pubsub_discovery = NULL; - activator->context = NULL; - - free(activator); - - return status; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c deleted file mode 100644 index 94a8e11..0000000 --- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c +++ /dev/null @@ -1,457 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <stdio.h> -#include <string.h> -#include <stdlib.h> -#include <unistd.h> -#include <stdbool.h> -#include <netdb.h> -#include <netinet/in.h> - -#include "constants.h" -#include "celix_threads.h" -#include "bundle_context.h" -#include "array_list.h" -#include "utils.h" -#include "celix_errno.h" -#include "filter.h" -#include "service_reference.h" -#include "service_registration.h" - -#include "publisher_endpoint_announce.h" -#include "etcd_common.h" -#include "etcd_watcher.h" -#include "etcd_writer.h" -#include "pubsub_endpoint.h" -#include "pubsub_discovery_impl.h" - -/* Discovery activator functions */ -celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) { - celix_status_t status = CELIX_SUCCESS; - - *ps_discovery = calloc(1, sizeof(**ps_discovery)); - - if (*ps_discovery == NULL) { - status = CELIX_ENOMEM; - } - else{ - (*ps_discovery)->context = context; - (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); - (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL); - celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL); - celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL); - celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL); - } - - return status; -} - -celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); - - hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs); - - while (hashMapIterator_hasNext(iter)) { - array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); - - for(int i=0; i < arrayList_size(pubEP_list); i++) { - pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i))); - } - arrayList_destroy(pubEP_list); - } - - hashMapIterator_destroy(iter); - - hashMap_destroy(ps_discovery->discoveredPubs, true, false); - ps_discovery->discoveredPubs = NULL; - - celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); - - celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex); - - - celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex); - - hashMap_destroy(ps_discovery->listenerReferences, false, false); - ps_discovery->listenerReferences = NULL; - - celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex); - - celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex); - - free(ps_discovery); - - return status; -} - -celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) { - celix_status_t status = CELIX_SUCCESS; - status = etcdCommon_init(ps_discovery->context); - ps_discovery->writer = etcdWriter_create(ps_discovery); - - return status; -} - -celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { - celix_status_t status = CELIX_SUCCESS; - - const char* fwUUID = NULL; - - bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if (fwUUID == NULL) { - printf("PSD: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - celixThreadMutex_lock(&ps_discovery->watchersMutex); - - hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers); - while (hashMapIterator_hasNext(iter)) { - struct watcher_info * wi = hashMapIterator_nextValue(iter); - etcdWatcher_stop(wi->watcher); - } - hashMapIterator_destroy(iter); - - celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); - - /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */ - - iter = hashMapIterator_create(ps_discovery->discoveredPubs); - while (hashMapIterator_hasNext(iter)) { - array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); - - int i; - for (i = 0; i < arrayList_size(pubEP_list); i++) { - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i); - if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) { - etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP); - } else { - pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false); - arrayList_remove(pubEP_list, i); - pubsubEndpoint_destroy(pubEP); - i--; - } - } - } - - hashMapIterator_destroy(iter); - - celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); - etcdWriter_destroy(ps_discovery->writer); - - iter = hashMapIterator_create(ps_discovery->watchers); - while (hashMapIterator_hasNext(iter)) { - struct watcher_info * wi = hashMapIterator_nextValue(iter); - etcdWatcher_destroy(wi->watcher); - } - hashMapIterator_destroy(iter); - hashMap_destroy(ps_discovery->watchers, true, true); - celixThreadMutex_unlock(&ps_discovery->watchersMutex); - return status; -} - -/* Functions called by the etcd_watcher */ - -celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - bool inform=false; - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - - char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key); - if(pubEP_list==NULL){ - arrayList_create(&pubEP_list); - arrayList_add(pubEP_list,pubEP); - hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list); - inform=true; - } - else{ - int i; - bool found = false; - for(i=0;i<arrayList_size(pubEP_list) && !found;i++){ - found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i)); - } - if(found){ - pubsubEndpoint_destroy(pubEP); - } - else{ - arrayList_add(pubEP_list,pubEP); - inform=true; - } - } - free(pubs_key); - - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - - if(inform){ - status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true); - } - - return status; -} - -celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - pubsub_endpoint_pt p = NULL; - bool found = false; - - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic); - array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key); - free(pubs_key); - if (pubEP_list == NULL) { - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", pubEP->topic); - status = CELIX_ILLEGAL_STATE; - } else { - int i; - - for (i = 0; !found && i < arrayList_size(pubEP_list); i++) { - p = arrayList_get(pubEP_list, i); - found = pubsubEndpoint_equals(pubEP, p); - if (found) { - arrayList_remove(pubEP_list, i); - pubsubEndpoint_destroy(p); - } - } - } - - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - if (found) { - status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false); - } - pubsubEndpoint_destroy(pubEP); - - return status; -} - -/* Callback to the pubsub_topology_manager */ -celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) { - celix_status_t status = CELIX_SUCCESS; - - // Inform listeners of new publisher endpoint - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - - if (pubsub_discovery->listenerReferences != NULL) { - hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences); - while (hashMapIterator_hasNext(iter)) { - service_reference_pt reference = hashMapIterator_nextKey(iter); - - publisher_endpoint_announce_pt listener = NULL; - - bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener); - if (epAdded) { - listener->announcePublisher(listener->handle, pubEP); - } else { - listener->removePublisher(listener->handle, pubEP); - } - bundleContext_ungetService(pubsub_discovery->context, reference, NULL); - } - hashMapIterator_destroy(iter); - } - - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - - return status; -} - - -/* Service's functions implementation */ -celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, pubEP->endpoint); - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - - char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); - - if(pubEP_list==NULL){ - arrayList_create(&pubEP_list); - hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list); - } - free(pub_key); - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_clone(pubEP, &p); - - arrayList_add(pubEP_list,p); - - status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true); - - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - - return status; -} - -celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - - char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); - free(pub_key); - if(pubEP_list==NULL){ - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic); - status = CELIX_ILLEGAL_STATE; - } - else{ - - int i; - bool found = false; - pubsub_endpoint_pt p = NULL; - - for(i=0;!found && i<arrayList_size(pubEP_list);i++){ - p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - found = pubsubEndpoint_equals(pubEP,p); - } - - if(!found){ - printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); - status = CELIX_ILLEGAL_STATE; - } - else{ - - arrayList_removeElement(pubEP_list,p); - - status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p); - - pubsubEndpoint_destroy(p); - } - } - - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - - return status; -} - -celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) { - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - - char *scope_topic_key = createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&pubsub_discovery->watchersMutex); - struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key); - if(wi) { - wi->nr_references++; - free(scope_topic_key); - } else { - wi = calloc(1, sizeof(*wi)); - etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher); - wi->nr_references = 1; - hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi); - } - - celixThreadMutex_unlock(&pubsub_discovery->watchersMutex); - - return CELIX_SUCCESS; -} - -celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) { - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - - char *scope_topic_key = createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&pubsub_discovery->watchersMutex); - - hash_map_entry_pt entry = hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key); - if(entry) { - struct watcher_info * wi = hashMapEntry_getValue(entry); - wi->nr_references--; - if(wi->nr_references == 0) { - char *key = hashMapEntry_getKey(entry); - hashMap_remove(pubsub_discovery->watchers, scope_topic_key); - free(key); - free(scope_topic_key); - etcdWatcher_stop(wi->watcher); - etcdWatcher_destroy(wi->watcher); - free(wi); - } - } else { - fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic); - } - celixThreadMutex_unlock(&pubsub_discovery->watchersMutex); - return CELIX_SUCCESS; -} - -/* pubsub_topology_manager tracker callbacks */ - -celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle; - publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service; - - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - - /* Notify the PSTM about discovered publisher endpoints */ - hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs); - while(hashMapIterator_hasNext(iter)){ - array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); - int i; - for(i=0;i<arrayList_size(pubEP_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - status += listener->announcePublisher(listener->handle, pubEP); - } - } - - hashMapIterator_destroy(iter); - - hashMap_put(pubsub_discovery->listenerReferences, reference, NULL); - - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - - printf("PSD: pubsub_tm_announce_publisher added.\n"); - - return status; -} - -celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service); - if (status == CELIX_SUCCESS) { - status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service); - } - - return status; -} - -celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = handle; - - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - - if (pubsub_discovery->listenerReferences != NULL) { - if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) { - printf("PSD: pubsub_tm_announce_publisher removed.\n"); - } - } - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - - return status; -} - http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/public/include/pubsub_discovery.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/public/include/pubsub_discovery.h b/pubsub/pubsub_discovery/public/include/pubsub_discovery.h deleted file mode 100644 index f77905a..0000000 --- a/pubsub/pubsub_discovery/public/include/pubsub_discovery.h +++ /dev/null @@ -1,26 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef PUBSUB_DISCOVERY_H_ -#define PUBSUB_DISCOVERY_H_ - -typedef struct pubsub_discovery *pubsub_discovery_pt; - - -#endif /* PUBSUB_DISCOVERY_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_common.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_common.c b/pubsub/pubsub_discovery/src/etcd_common.c new file mode 100644 index 0000000..c757801 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_common.c @@ -0,0 +1,82 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include "celix_log.h" +#include "constants.h" + +#include <curl/curl.h> +#include "etcd.h" +#include "etcd_watcher.h" + +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + + +#define MAX_ROOTNODE_LENGTH 128 +#define MAX_LOCALNODE_LENGTH 4096 +#define MAX_FIELD_LENGTH 128 + +#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 + +// be careful - this should be higher than the curl timeout +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 + + +celix_status_t etcdCommon_init(bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + const char* etcd_server = NULL; + const char* etcd_port_string = NULL; + int etcd_port = 0; + + if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) { + etcd_server = DEFAULT_ETCD_SERVER_IP; + } + + if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) { + etcd_port = DEFAULT_ETCD_SERVER_PORT; + } else { + char* endptr = NULL; + errno = 0; + etcd_port = strtol(etcd_port_string, &endptr, 10); + if (*endptr || errno != 0) { + etcd_port = DEFAULT_ETCD_SERVER_PORT; + } + } + + printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port); + + if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) { + status = CELIX_BUNDLE_EXCEPTION; + } else { + status = CELIX_SUCCESS; + } + + return status; +} + http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_common.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_common.h b/pubsub/pubsub_discovery/src/etcd_common.h new file mode 100644 index 0000000..7a3e7b6 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_common.h @@ -0,0 +1,28 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef ETCD_COMMON_H_ +#define ETCD_COMMON_H_ + +#include "bundle_context.h" +#include "celix_errno.h" + +celix_status_t etcdCommon_init(bundle_context_pt context); + +#endif /* ETCD_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.c b/pubsub/pubsub_discovery/src/etcd_watcher.c new file mode 100644 index 0000000..3c3a5a8 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_watcher.c @@ -0,0 +1,290 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include "celix_log.h" +#include "constants.h" + +#include "etcd.h" +#include "etcd_watcher.h" + +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + + + +#define MAX_ROOTNODE_LENGTH 128 +#define MAX_LOCALNODE_LENGTH 4096 +#define MAX_FIELD_LENGTH 128 + +#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" +#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" + +#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 + +// be careful - this should be higher than the curl timeout +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 + + +struct etcd_watcher { + pubsub_discovery_pt pubsub_discovery; + + celix_thread_mutex_t watcherLock; + celix_thread_t watcherThread; + + char *scope; + char *topic; + volatile bool running; +}; + +struct etcd_writer { + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; +}; + + +// note that the rootNode shouldn't have a leading slash +static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) { + celix_status_t status = CELIX_SUCCESS; + const char* rootPath = NULL; + + if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { + snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic); + } else { + snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic); + } + + return status; +} + +static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) { + celix_status_t status = CELIX_SUCCESS; + const char* rootPath = NULL; + + if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { + strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); + } else { + strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); + } + + return status; +} + + +static void add_node(const char *key, const char *value, void* arg) { + pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg; + pubsub_endpoint_pt pubEP = NULL; + celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP); + if(!status && pubEP) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } +} + +static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) { + celix_status_t status = CELIX_SUCCESS; + if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) { + status = CELIX_ILLEGAL_ARGUMENT; + } + return status; +} + +// gets everything from provided key +celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) { + + celix_status_t status = CELIX_SUCCESS; + + char rootPath[MAX_ROOTNODE_LENGTH]; + char *expr = NULL; + char scope[MAX_FIELD_LENGTH]; + char topic[MAX_FIELD_LENGTH]; + char fwUUID[MAX_FIELD_LENGTH]; + char serviceId[MAX_FIELD_LENGTH]; + + memset(rootPath,0,MAX_ROOTNODE_LENGTH); + memset(topic,0,MAX_FIELD_LENGTH); + memset(fwUUID,0,MAX_FIELD_LENGTH); + memset(serviceId,0,MAX_FIELD_LENGTH); + + etcdWatcher_getRootPath(pubsub_discovery->context, rootPath); + + asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath); + if(expr) { + int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); + free(expr); + if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. + status = CELIX_ILLEGAL_STATE; + } + else{ + status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP); + } + } + return status; +} + +/* + * performs (blocking) etcd_watch calls to check for + * changing discovery endpoint information within etcd. + */ +static void* etcdWatcher_run(void* data) { + etcd_watcher_pt watcher = (etcd_watcher_pt) data; + time_t timeBeforeWatch = time(NULL); + char rootPath[MAX_ROOTNODE_LENGTH]; + long long highestModified = 0; + + pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery; + bundle_context_pt context = ps_discovery->context; + + memset(rootPath, 0, MAX_ROOTNODE_LENGTH); + + //TODO: add topic to etcd key + etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); + etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified); + + while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) { + + char *rkey = NULL; + char *value = NULL; + char *preValue = NULL; + char *action = NULL; + long long modIndex; + + celixThreadMutex_unlock(&watcher->watcherLock); + + if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { + pubsub_endpoint_pt pubEP = NULL; + if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "delete") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "expire") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "update") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else { + fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action); + } + highestModified = modIndex; + } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { + sleep(DEFAULT_ETCD_TTL / 4); + } + + FREE_MEM(action); + FREE_MEM(value); + FREE_MEM(preValue); + FREE_MEM(rkey); + + /* prevent busy waiting, in case etcd_watch returns false */ + + + if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { + timeBeforeWatch = time(NULL); + } + + } + + if (watcher->running == false) { + celixThreadMutex_unlock(&watcher->watcherLock); + } + + return NULL; +} + +celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) { + celix_status_t status = CELIX_SUCCESS; + + + if (pubsub_discovery == NULL) { + return CELIX_BUNDLE_EXCEPTION; + } + + (*watcher) = calloc(1, sizeof(struct etcd_watcher)); + + if(*watcher == NULL){ + return CELIX_ENOMEM; + } + + (*watcher)->pubsub_discovery = pubsub_discovery; + (*watcher)->scope = strdup(scope); + (*watcher)->topic = strdup(topic); + + + celixThreadMutex_create(&(*watcher)->watcherLock, NULL); + + celixThreadMutex_lock(&(*watcher)->watcherLock); + + status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); + if (status == CELIX_SUCCESS) { + (*watcher)->running = true; + } + + celixThreadMutex_unlock(&(*watcher)->watcherLock); + + + return status; +} + +celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) { + + celix_status_t status = CELIX_SUCCESS; + + char rootPath[MAX_ROOTNODE_LENGTH]; + etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); + celixThreadMutex_destroy(&(watcher->watcherLock)); + + free(watcher->scope); + free(watcher->topic); + free(watcher); + + return status; +} + +celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&(watcher->watcherLock)); + watcher->running = false; + celixThreadMutex_unlock(&(watcher->watcherLock)); + + celixThread_join(watcher->watcherThread, NULL); + + return status; + +} + + http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_watcher.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.h b/pubsub/pubsub_discovery/src/etcd_watcher.h new file mode 100644 index 0000000..c425e60 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_watcher.h @@ -0,0 +1,38 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef ETCD_WATCHER_H_ +#define ETCD_WATCHER_H_ + +#include "bundle_context.h" +#include "celix_errno.h" + +#include "pubsub_discovery.h" +#include "pubsub_endpoint.h" + +typedef struct etcd_watcher *etcd_watcher_pt; + +celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery, bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher); +celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher); +celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher); + +celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP); + + +#endif /* ETCD_WATCHER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_writer.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_writer.c b/pubsub/pubsub_discovery/src/etcd_writer.c new file mode 100644 index 0000000..1c423f3 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_writer.c @@ -0,0 +1,189 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include "celix_log.h" +#include "constants.h" + +#include "etcd.h" +#include "etcd_writer.h" + +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + +#define MAX_ROOTNODE_LENGTH 128 + +#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" +#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" + +#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 + +// be careful - this should be higher than the curl timeout +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 + +struct etcd_writer { + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; +}; + + +static const char* etcdWriter_getRootPath(bundle_context_pt context); +static void* etcdWriter_run(void* data); + + +etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) { + etcd_writer_pt writer = calloc(1, sizeof(*writer)); + if(writer) { + celixThreadMutex_create(&writer->localPubsLock, NULL); + arrayList_create(&writer->localPubs); + writer->pubsub_discovery = disc; + writer->running = true; + celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer); + } + return writer; +} + +void etcdWriter_destroy(etcd_writer_pt writer) { + char dir[MAX_ROOTNODE_LENGTH]; + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + writer->running = false; + celixThread_join(writer->writerThread, NULL); + + celixThreadMutex_lock(&writer->localPubsLock); + for(int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i); + memset(dir,0,MAX_ROOTNODE_LENGTH); + snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID); + etcd_del(dir); + pubsubEndpoint_destroy(pubEP); + } + arrayList_destroy(writer->localPubs); + + celixThreadMutex_unlock(&writer->localPubsLock); + celixThreadMutex_destroy(&(writer->localPubsLock)); + + free(writer); +} + +celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){ + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if(storeEP){ + const char *fwUUID = NULL; + bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) { + celixThreadMutex_lock(&writer->localPubsLock); + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); + arrayList_add(writer->localPubs,p); + celixThreadMutex_unlock(&writer->localPubsLock); + } + } + + char *key; + + const char* ttlStr = NULL; + int ttl = 0; + + // determine ttl + if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) { + ttl = DEFAULT_ETCD_TTL; + } else { + char* endptr = NULL; + errno = 0; + ttl = strtol(ttlStr, &endptr, 10); + if (*endptr || errno != 0) { + ttl = DEFAULT_ETCD_TTL; + } + } + + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID); + + if(!etcd_set(key,pubEP->endpoint,ttl,false)){ + status = CELIX_ILLEGAL_ARGUMENT; + } + FREE_MEM(key); + return status; +} + +celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + char *key = NULL; + + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID); + + celixThreadMutex_lock(&writer->localPubsLock); + for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i); + if (pubsubEndpoint_equals(ep, pubEP)) { + arrayList_remove(writer->localPubs, i); + pubsubEndpoint_destroy(ep); + break; + } + } + celixThreadMutex_unlock(&writer->localPubsLock); + + if (etcd_del(key)) { + printf("Failed to remove key %s from ETCD\n",key); + status = CELIX_ILLEGAL_ARGUMENT; + } + FREE_MEM(key); + return status; +} + +static void* etcdWriter_run(void* data) { + etcd_writer_pt writer = (etcd_writer_pt)data; + while(writer->running) { + celixThreadMutex_lock(&writer->localPubsLock); + for(int i=0; i < arrayList_size(writer->localPubs); i++) { + etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false); + } + celixThreadMutex_unlock(&writer->localPubsLock); + sleep(DEFAULT_ETCD_TTL / 2); + } + + return NULL; +} + +static const char* etcdWriter_getRootPath(bundle_context_pt context) { + const char* rootPath = NULL; + bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath); + if(rootPath == NULL) { + rootPath = DEFAULT_ETCD_ROOTPATH; + } + return rootPath; +} + http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_writer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_writer.h b/pubsub/pubsub_discovery/src/etcd_writer.h new file mode 100644 index 0000000..3ff98b9 --- /dev/null +++ b/pubsub/pubsub_discovery/src/etcd_writer.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef ETCD_WRITER_H_ +#define ETCD_WRITER_H_ + +#include "bundle_context.h" +#include "celix_errno.h" + +#include "pubsub_discovery.h" +#include "pubsub_endpoint.h" + +typedef struct etcd_writer *etcd_writer_pt; + + +etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery); +void etcdWriter_destroy(etcd_writer_pt writer); + +celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP); +celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP); + + +#endif /* ETCD_WRITER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/psd_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/psd_activator.c b/pubsub/pubsub_discovery/src/psd_activator.c new file mode 100644 index 0000000..89a517d --- /dev/null +++ b/pubsub/pubsub_discovery/src/psd_activator.c @@ -0,0 +1,171 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "bundle_activator.h" +#include "service_tracker.h" +#include "service_registration.h" +#include "constants.h" +#include "celix_log.h" + +#include "pubsub_common.h" +#include "publisher_endpoint_announce.h" +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + +struct activator { + bundle_context_pt context; + pubsub_discovery_pt pubsub_discovery; + + service_tracker_pt pstmPublishersTracker; + + publisher_endpoint_announce_pt publisherEPAnnounce; + service_registration_pt publisherEPAnnounceService; +}; + +static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) { + celix_status_t status = CELIX_SUCCESS; + + service_tracker_customizer_pt customizer = NULL; + + status = serviceTrackerCustomizer_create(activator->pubsub_discovery, + NULL, + pubsub_discovery_tmPublisherAnnounceAdded, + pubsub_discovery_tmPublisherAnnounceModified, + pubsub_discovery_tmPublisherAnnounceRemoved, + &customizer); + + if (status == CELIX_SUCCESS) { + status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker); + } + + return status; +} + +celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { + celix_status_t status = CELIX_SUCCESS; + + struct activator* activator = calloc(1, sizeof(*activator)); + + if (activator) { + activator->context = context; + activator->pstmPublishersTracker = NULL; + activator->publisherEPAnnounce = NULL; + activator->publisherEPAnnounceService = NULL; + + status = pubsub_discovery_create(context, &activator->pubsub_discovery); + + if (status == CELIX_SUCCESS) { + status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker)); + } + + if (status == CELIX_SUCCESS) { + *userData = activator; + } else { + free(activator); + } + } else { + status = CELIX_ENOMEM; + } + + return status; + +} + +celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + + struct activator *activator = userData; + + publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer)); + + if (pubEPAnnouncer) { + + pubEPAnnouncer->handle = activator->pubsub_discovery; + pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher; + pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher; + pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic; + pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic; + activator->publisherEPAnnounce = pubEPAnnouncer; + + properties_pt props = properties_create(); + properties_set(props, "PUBSUB_DISCOVERY", "true"); + + // pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values + status = pubsub_discovery_start(activator->pubsub_discovery); + + if (status == CELIX_SUCCESS) { + status = serviceTracker_open(activator->pstmPublishersTracker); + } + + if (status == CELIX_SUCCESS) { + status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService); + } + + + } + else{ + status = CELIX_ENOMEM; + } + + if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){ + free(pubEPAnnouncer); + } + + + return status; +} + +celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; + + status += pubsub_discovery_stop(activator->pubsub_discovery); + + status += serviceTracker_close(activator->pstmPublishersTracker); + + status += serviceRegistration_unregister(activator->publisherEPAnnounceService); + + if (status == CELIX_SUCCESS) { + free(activator->publisherEPAnnounce); + } + + return status; +} + +celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; + + status += serviceTracker_destroy(activator->pstmPublishersTracker); + status += pubsub_discovery_destroy(activator->pubsub_discovery); + + activator->publisherEPAnnounce = NULL; + activator->publisherEPAnnounceService = NULL; + activator->pstmPublishersTracker = NULL; + activator->pubsub_discovery = NULL; + activator->context = NULL; + + free(activator); + + return status; +} http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/pubsub_discovery.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery.h b/pubsub/pubsub_discovery/src/pubsub_discovery.h new file mode 100644 index 0000000..f77905a --- /dev/null +++ b/pubsub/pubsub_discovery/src/pubsub_discovery.h @@ -0,0 +1,26 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef PUBSUB_DISCOVERY_H_ +#define PUBSUB_DISCOVERY_H_ + +typedef struct pubsub_discovery *pubsub_discovery_pt; + + +#endif /* PUBSUB_DISCOVERY_H_ */
