http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/CMakeLists.txt 
b/pubsub/pubsub_discovery/CMakeLists.txt
index 948da3e..0e7d6c5 100644
--- a/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/pubsub/pubsub_discovery/CMakeLists.txt
@@ -18,26 +18,25 @@
 find_package(CURL REQUIRED)
 find_package(Jansson REQUIRED)
 
-include_directories("${CURL_INCLUDE_DIR}")
-include_directories("${JANSSON_INCLUDE_DIR}")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
-include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
-include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
-include_directories("private/include")
-include_directories("public/include")
-
 add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
     VERSION "1.0.0"
     SOURCES
-               private/src/psd_activator.c
-               private/src/pubsub_discovery_impl.c
-               private/src/etcd_common.c
-               private/src/etcd_watcher.c
-               private/src/etcd_writer.c
+        src/psd_activator.c
+        src/pubsub_discovery_impl.c
+        src/etcd_common.c
+        src/etcd_watcher.c
+        src/etcd_writer.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 
-target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery 
celix_framework celix_utils etcdlib_static ${CURL_LIBRARIES} 
${JANSSON_LIBRARIES})
+target_include_directories(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 PRIVATE
+    src
+    include
+       ${CURL_INCLUDE_DIR}
+       ${JANSSON_INCLUDE_DIR}
+        )
+
+target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery 
PRIVATE Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} 
${JANSSON_LIBRARIES})
 install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_common.h 
b/pubsub/pubsub_discovery/private/include/etcd_common.h
deleted file mode 100644
index 7a3e7b6..0000000
--- a/pubsub/pubsub_discovery/private/include/etcd_common.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_COMMON_H_
-#define ETCD_COMMON_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-celix_status_t etcdCommon_init(bundle_context_pt context);
-
-#endif /* ETCD_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_watcher.h 
b/pubsub/pubsub_discovery/private/include/etcd_watcher.h
deleted file mode 100644
index c425e60..0000000
--- a/pubsub/pubsub_discovery/private/include/etcd_watcher.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_WATCHER_H_
-#define ETCD_WATCHER_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_endpoint.h"
-
-typedef struct etcd_watcher *etcd_watcher_pt;
-
-celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery,  
bundle_context_pt context, const char *scope, const char* topic, 
etcd_watcher_pt *watcher);
-celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
-celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
-
-celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt 
discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
-
-
-#endif /* ETCD_WATCHER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/etcd_writer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/etcd_writer.h 
b/pubsub/pubsub_discovery/private/include/etcd_writer.h
deleted file mode 100644
index 3ff98b9..0000000
--- a/pubsub/pubsub_discovery/private/include/etcd_writer.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef ETCD_WRITER_H_
-#define ETCD_WRITER_H_
-
-#include "bundle_context.h"
-#include "celix_errno.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_endpoint.h"
-
-typedef struct etcd_writer *etcd_writer_pt;
-
-
-etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
-void etcdWriter_destroy(etcd_writer_pt writer);
-
-celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP,bool storeEP);
-celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP);
-
-
-#endif /* ETCD_WRITER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h 
b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
deleted file mode 100644
index 676a6ab..0000000
--- a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBSUB_DISCOVERY_IMPL_H_
-#define PUBSUB_DISCOVERY_IMPL_H_
-
-#include "bundle_context.h"
-#include "service_reference.h"
-
-#include "etcd_watcher.h"
-#include "etcd_writer.h"
-#include "pubsub_endpoint.h"
-
-#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
-
-struct watcher_info {
-    etcd_watcher_pt watcher;
-    int nr_references;
-};
-
-struct pubsub_discovery {
-       bundle_context_pt context;
-
-       celix_thread_mutex_t discoveredPubsMutex;
-       hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>>
-
-       celix_thread_mutex_t listenerReferencesMutex;
-       hash_map_pt listenerReferences; //key=serviceReference, value=nop
-
-       celix_thread_mutex_t watchersMutex;
-       hash_map_pt watchers; //key = topicname, value = struct watcher_info
-
-       etcd_writer_pt writer;
-};
-
-
-celix_status_t pubsub_discovery_create(bundle_context_pt context, 
pubsub_discovery_pt* node_discovery);
-celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery);
-celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery);
-celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
-
-celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, 
pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, 
pubsub_endpoint_pt pubEP);
-
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, 
service_reference_pt reference, void * service);
-celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, 
service_reference_pt reference, void * service);
-celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, 
service_reference_pt reference, void * service);
-
-celix_status_t pubsub_discovery_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* 
scope, const char* topic);
-celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* 
scope, const char* topic);
-
-celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt 
discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
-
-#endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c 
b/pubsub/pubsub_discovery/private/src/etcd_common.c
deleted file mode 100644
index c757801..0000000
--- a/pubsub/pubsub_discovery/private/src/etcd_common.c
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include <curl/curl.h>
-#include "etcd.h"
-#include "etcd_watcher.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-
-#define MAX_ROOTNODE_LENGTH            128
-#define MAX_LOCALNODE_LENGTH   4096
-#define MAX_FIELD_LENGTH               128
-
-#define CFG_ETCD_SERVER_IP             "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT   "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
-
-
-celix_status_t etcdCommon_init(bundle_context_pt context) {
-    celix_status_t status = CELIX_SUCCESS;
-    const char* etcd_server = NULL;
-    const char* etcd_port_string = NULL;
-    int etcd_port = 0;
-
-    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) 
!= CELIX_SUCCESS) || !etcd_server) {
-        etcd_server = DEFAULT_ETCD_SERVER_IP;
-    }
-
-    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, 
&etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
-        etcd_port = DEFAULT_ETCD_SERVER_PORT;
-    } else {
-        char* endptr = NULL;
-        errno = 0;
-        etcd_port = strtol(etcd_port_string, &endptr, 10);
-        if (*endptr || errno != 0) {
-            etcd_port = DEFAULT_ETCD_SERVER_PORT;
-        }
-    }
-
-    printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
-
-    if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
-        status = CELIX_BUNDLE_EXCEPTION;
-    } else {
-        status = CELIX_SUCCESS;
-    }
-
-    return status;
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c 
b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
deleted file mode 100644
index 3c3a5a8..0000000
--- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include "etcd.h"
-#include "etcd_watcher.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-
-
-#define MAX_ROOTNODE_LENGTH             128
-#define MAX_LOCALNODE_LENGTH            4096
-#define MAX_FIELD_LENGTH                128
-
-#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
-
-#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT        2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL                30
-
-
-struct etcd_watcher {
-       pubsub_discovery_pt pubsub_discovery;
-
-       celix_thread_mutex_t watcherLock;
-       celix_thread_t watcherThread;
-
-       char *scope;
-       char *topic;
-       volatile bool running;
-};
-
-struct etcd_writer {
-       pubsub_discovery_pt pubsub_discovery;
-       celix_thread_mutex_t localPubsLock;
-       array_list_pt localPubs;
-       volatile bool running;
-       celix_thread_t writerThread;
-};
-
-
-// note that the rootNode shouldn't have a leading slash
-static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, 
const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
-       celix_status_t status = CELIX_SUCCESS;
-       const char* rootPath = NULL;
-
-       if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-               snprintf(rootNode, rootNodeLen, "%s/%s/%s", 
DEFAULT_ETCD_ROOTPATH, scope, topic);
-       } else {
-               snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, 
topic);
-       }
-
-       return status;
-}
-
-static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* 
rootNode) {
-       celix_status_t status = CELIX_SUCCESS;
-       const char* rootPath = NULL;
-
-       if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-               strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
-       } else {
-               strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
-       }
-
-       return status;
-}
-
-
-static void add_node(const char *key, const char *value, void* arg) {
-       pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
-       pubsub_endpoint_pt pubEP = NULL;
-       celix_status_t status = 
etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
-       if(!status && pubEP) {
-               pubsub_discovery_addNode(ps_discovery, pubEP);
-       }
-}
-
-static celix_status_t 
etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, 
const char *rootPath, long long * highestModified) {
-       celix_status_t status = CELIX_SUCCESS;
-       if(etcd_get_directory(rootPath, add_node, ps_discovery, 
highestModified)) {
-               status = CELIX_ILLEGAL_ARGUMENT;
-       }
-       return status;
-}
-
-// gets everything from provided key
-celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt 
pubsub_discovery, const char* etcdKey, const char* etcdValue, 
pubsub_endpoint_pt* pubEP) {
-
-       celix_status_t status = CELIX_SUCCESS;
-
-       char rootPath[MAX_ROOTNODE_LENGTH];
-       char *expr = NULL;
-       char scope[MAX_FIELD_LENGTH];
-       char topic[MAX_FIELD_LENGTH];
-       char fwUUID[MAX_FIELD_LENGTH];
-       char serviceId[MAX_FIELD_LENGTH];
-
-       memset(rootPath,0,MAX_ROOTNODE_LENGTH);
-       memset(topic,0,MAX_FIELD_LENGTH);
-       memset(fwUUID,0,MAX_FIELD_LENGTH);
-       memset(serviceId,0,MAX_FIELD_LENGTH);
-
-       etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
-
-       asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
-       if(expr) {
-               int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
-               free(expr);
-               if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
-                       status = CELIX_ILLEGAL_STATE;
-               }
-               else{
-                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
-               }
-       }
-       return status;
-}
-
-/*
- * performs (blocking) etcd_watch calls to check for
- * changing discovery endpoint information within etcd.
- */
-static void* etcdWatcher_run(void* data) {
-       etcd_watcher_pt watcher = (etcd_watcher_pt) data;
-       time_t timeBeforeWatch = time(NULL);
-       char rootPath[MAX_ROOTNODE_LENGTH];
-       long long highestModified = 0;
-
-       pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
-       bundle_context_pt context = ps_discovery->context;
-
-       memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
-
-       //TODO: add topic to etcd key
-       etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, 
rootPath, MAX_ROOTNODE_LENGTH);
-       etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, 
&highestModified);
-
-       while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) 
&& watcher->running) {
-
-               char *rkey = NULL;
-               char *value = NULL;
-               char *preValue = NULL;
-               char *action = NULL;
-               long long modIndex;
-
-               celixThreadMutex_unlock(&watcher->watcherLock);
-
-               if (etcd_watch(rootPath, highestModified + 1, &action, 
&preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
-                       pubsub_endpoint_pt pubEP = NULL;
-                       if ((strcmp(action, "set") == 0) || (strcmp(action, 
"create") == 0)) {
-                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
-                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
-                               }
-                       } else if (strcmp(action, "delete") == 0) {
-                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
-                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
-                               }
-                       } else if (strcmp(action, "expire") == 0) {
-                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
-                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
-                               }
-                       } else if (strcmp(action, "update") == 0) {
-                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
-                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
-                               }
-                       } else {
-                               fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, 
"Unexpected action: %s", action);
-                       }
-                       highestModified = modIndex;
-               } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 
4)) {
-                       sleep(DEFAULT_ETCD_TTL / 4);
-               }
-
-               FREE_MEM(action);
-               FREE_MEM(value);
-               FREE_MEM(preValue);
-               FREE_MEM(rkey);
-
-               /* prevent busy waiting, in case etcd_watch returns false */
-
-
-               if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
-                       timeBeforeWatch = time(NULL);
-               }
-
-       }
-
-       if (watcher->running == false) {
-               celixThreadMutex_unlock(&watcher->watcherLock);
-       }
-
-       return NULL;
-}
-
-celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, 
bundle_context_pt context, const char *scope, const char *topic, 
etcd_watcher_pt *watcher) {
-       celix_status_t status = CELIX_SUCCESS;
-
-
-       if (pubsub_discovery == NULL) {
-               return CELIX_BUNDLE_EXCEPTION;
-       }
-
-       (*watcher) = calloc(1, sizeof(struct etcd_watcher));
-
-       if(*watcher == NULL){
-               return CELIX_ENOMEM;
-       }
-
-       (*watcher)->pubsub_discovery = pubsub_discovery;
-       (*watcher)->scope = strdup(scope);
-       (*watcher)->topic = strdup(topic);
-
-
-       celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
-
-       celixThreadMutex_lock(&(*watcher)->watcherLock);
-
-       status = celixThread_create(&(*watcher)->watcherThread, NULL, 
etcdWatcher_run, *watcher);
-       if (status == CELIX_SUCCESS) {
-               (*watcher)->running = true;
-       }
-
-       celixThreadMutex_unlock(&(*watcher)->watcherLock);
-
-
-       return status;
-}
-
-celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
-
-       celix_status_t status = CELIX_SUCCESS;
-
-       char rootPath[MAX_ROOTNODE_LENGTH];
-       etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, 
watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
-       celixThreadMutex_destroy(&(watcher->watcherLock));
-
-       free(watcher->scope);
-       free(watcher->topic);
-       free(watcher);
-
-       return status;
-}
-
-celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&(watcher->watcherLock));
-       watcher->running = false;
-       celixThreadMutex_unlock(&(watcher->watcherLock));
-
-       celixThread_join(watcher->watcherThread, NULL);
-
-       return status;
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c 
b/pubsub/pubsub_discovery/private/src/etcd_writer.c
deleted file mode 100644
index 1c423f3..0000000
--- a/pubsub/pubsub_discovery/private/src/etcd_writer.c
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "celix_log.h"
-#include "constants.h"
-
-#include "etcd.h"
-#include "etcd_writer.h"
-
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-#define MAX_ROOTNODE_LENGTH            128
-
-#define CFG_ETCD_ROOT_PATH             "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH  "pubsub/discovery"
-
-#define CFG_ETCD_SERVER_IP             "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
-
-#define CFG_ETCD_SERVER_PORT   "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
-
-// be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
-
-struct etcd_writer {
-       pubsub_discovery_pt pubsub_discovery;
-       celix_thread_mutex_t localPubsLock;
-       array_list_pt localPubs;
-       volatile bool running;
-       celix_thread_t writerThread;
-};
-
-
-static const char* etcdWriter_getRootPath(bundle_context_pt context);
-static void* etcdWriter_run(void* data);
-
-
-etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
-       etcd_writer_pt writer = calloc(1, sizeof(*writer));
-       if(writer) {
-               celixThreadMutex_create(&writer->localPubsLock, NULL);
-               arrayList_create(&writer->localPubs);
-               writer->pubsub_discovery = disc;
-               writer->running = true;
-               celixThread_create(&writer->writerThread, NULL, etcdWriter_run, 
writer);
-       }
-       return writer;
-}
-
-void etcdWriter_destroy(etcd_writer_pt writer) {
-       char dir[MAX_ROOTNODE_LENGTH];
-       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-       writer->running = false;
-       celixThread_join(writer->writerThread, NULL);
-
-       celixThreadMutex_lock(&writer->localPubsLock);
-       for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
-               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
-               memset(dir,0,MAX_ROOTNODE_LENGTH);
-               
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
-               etcd_del(dir);
-               pubsubEndpoint_destroy(pubEP);
-       }
-       arrayList_destroy(writer->localPubs);
-
-       celixThreadMutex_unlock(&writer->localPubsLock);
-       celixThreadMutex_destroy(&(writer->localPubsLock));
-
-       free(writer);
-}
-
-celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP, bool storeEP){
-       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-       if(storeEP){
-               const char *fwUUID = NULL;
-               bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-               if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
-                       celixThreadMutex_lock(&writer->localPubsLock);
-                       pubsub_endpoint_pt p = NULL;
-                       pubsubEndpoint_clone(pubEP, &p);
-                       arrayList_add(writer->localPubs,p);
-                       celixThreadMutex_unlock(&writer->localPubsLock);
-               }
-       }
-
-       char *key;
-
-       const char* ttlStr = NULL;
-       int ttl = 0;
-
-       // determine ttl
-       if ((bundleContext_getProperty(writer->pubsub_discovery->context, 
CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
-               ttl = DEFAULT_ETCD_TTL;
-       } else {
-               char* endptr = NULL;
-               errno = 0;
-               ttl = strtol(ttlStr, &endptr, 10);
-               if (*endptr || errno != 0) {
-                       ttl = DEFAULT_ETCD_TTL;
-               }
-       }
-
-       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-       
asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
-
-       if(!etcd_set(key,pubEP->endpoint,ttl,false)){
-               status = CELIX_ILLEGAL_ARGUMENT;
-       }
-       FREE_MEM(key);
-       return status;
-}
-
-celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP) {
-       celix_status_t status = CELIX_SUCCESS;
-       char *key = NULL;
-
-       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-       asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
-
-       celixThreadMutex_lock(&writer->localPubsLock);
-       for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
-               pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
-               if (pubsubEndpoint_equals(ep, pubEP)) {
-                       arrayList_remove(writer->localPubs, i);
-                       pubsubEndpoint_destroy(ep);
-                       break;
-               }
-       }
-       celixThreadMutex_unlock(&writer->localPubsLock);
-
-       if (etcd_del(key)) {
-               printf("Failed to remove key %s from ETCD\n",key);
-               status = CELIX_ILLEGAL_ARGUMENT;
-       }
-       FREE_MEM(key);
-       return status;
-}
-
-static void* etcdWriter_run(void* data) {
-       etcd_writer_pt writer = (etcd_writer_pt)data;
-       while(writer->running) {
-               celixThreadMutex_lock(&writer->localPubsLock);
-               for(int i=0; i < arrayList_size(writer->localPubs); i++) {
-                       
etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
-               }
-               celixThreadMutex_unlock(&writer->localPubsLock);
-               sleep(DEFAULT_ETCD_TTL / 2);
-       }
-
-       return NULL;
-}
-
-static const char* etcdWriter_getRootPath(bundle_context_pt context) {
-       const char* rootPath = NULL;
-       bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
-       if(rootPath == NULL) {
-               rootPath = DEFAULT_ETCD_ROOTPATH;
-       }
-       return rootPath;
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c 
b/pubsub/pubsub_discovery/private/src/psd_activator.c
deleted file mode 100644
index 89a517d..0000000
--- a/pubsub/pubsub_discovery/private/src/psd_activator.c
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "service_registration.h"
-#include "constants.h"
-#include "celix_log.h"
-
-#include "pubsub_common.h"
-#include "publisher_endpoint_announce.h"
-#include "pubsub_discovery.h"
-#include "pubsub_discovery_impl.h"
-
-struct activator {
-       bundle_context_pt context;
-       pubsub_discovery_pt pubsub_discovery;
-
-       service_tracker_pt pstmPublishersTracker;
-
-       publisher_endpoint_announce_pt publisherEPAnnounce;
-       service_registration_pt publisherEPAnnounceService;
-};
-
-static celix_status_t createTMPublisherAnnounceTracker(struct activator 
*activator, service_tracker_pt *tracker) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       service_tracker_customizer_pt customizer = NULL;
-
-       status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
-                       NULL,
-                       pubsub_discovery_tmPublisherAnnounceAdded,
-                       pubsub_discovery_tmPublisherAnnounceModified,
-                       pubsub_discovery_tmPublisherAnnounceRemoved,
-                       &customizer);
-
-       if (status == CELIX_SUCCESS) {
-               status = serviceTracker_create(activator->context, (char *) 
PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       struct activator* activator = calloc(1, sizeof(*activator));
-
-       if (activator) {
-               activator->context = context;
-               activator->pstmPublishersTracker = NULL;
-               activator->publisherEPAnnounce = NULL;
-               activator->publisherEPAnnounceService = NULL;
-
-               status = pubsub_discovery_create(context, 
&activator->pubsub_discovery);
-
-               if (status == CELIX_SUCCESS) {
-                       status = createTMPublisherAnnounceTracker(activator, 
&(activator->pstmPublishersTracker));
-               }
-
-               if (status == CELIX_SUCCESS) {
-                       *userData = activator;
-               } else {
-                       free(activator);
-               }
-       } else {
-               status = CELIX_ENOMEM;
-       }
-
-       return status;
-
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       struct activator *activator = userData;
-
-       publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, 
sizeof(*pubEPAnnouncer));
-
-       if (pubEPAnnouncer) {
-
-               pubEPAnnouncer->handle = activator->pubsub_discovery;
-               pubEPAnnouncer->announcePublisher = 
pubsub_discovery_announcePublisher;
-               pubEPAnnouncer->removePublisher = 
pubsub_discovery_removePublisher;
-               pubEPAnnouncer->interestedInTopic = 
pubsub_discovery_interestedInTopic;
-               pubEPAnnouncer->uninterestedInTopic = 
pubsub_discovery_uninterestedInTopic;
-               activator->publisherEPAnnounce = pubEPAnnouncer;
-
-               properties_pt props = properties_create();
-               properties_set(props, "PUBSUB_DISCOVERY", "true");
-
-               // pubsub_discovery_start needs to be first to initalize the 
propert etcd_watcher values
-               status = pubsub_discovery_start(activator->pubsub_discovery);
-
-               if (status == CELIX_SUCCESS) {
-                       status = 
serviceTracker_open(activator->pstmPublishersTracker);
-               }
-
-               if (status == CELIX_SUCCESS) {
-                       status = bundleContext_registerService(context, (char 
*) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, 
&activator->publisherEPAnnounceService);
-               }
-
-
-       }
-       else{
-               status = CELIX_ENOMEM;
-       }
-
-       if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
-               free(pubEPAnnouncer);
-       }
-
-
-       return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       status += pubsub_discovery_stop(activator->pubsub_discovery);
-
-       status += serviceTracker_close(activator->pstmPublishersTracker);
-
-       status += 
serviceRegistration_unregister(activator->publisherEPAnnounceService);
-
-       if (status == CELIX_SUCCESS) {
-               free(activator->publisherEPAnnounce);
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       status += serviceTracker_destroy(activator->pstmPublishersTracker);
-       status += pubsub_discovery_destroy(activator->pubsub_discovery);
-
-       activator->publisherEPAnnounce = NULL;
-       activator->publisherEPAnnounceService = NULL;
-       activator->pstmPublishersTracker = NULL;
-       activator->pubsub_discovery = NULL;
-       activator->context = NULL;
-
-       free(activator);
-
-       return status;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c 
b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
deleted file mode 100644
index 94a8e11..0000000
--- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ /dev/null
@@ -1,457 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdbool.h>
-#include <netdb.h>
-#include <netinet/in.h>
-
-#include "constants.h"
-#include "celix_threads.h"
-#include "bundle_context.h"
-#include "array_list.h"
-#include "utils.h"
-#include "celix_errno.h"
-#include "filter.h"
-#include "service_reference.h"
-#include "service_registration.h"
-
-#include "publisher_endpoint_announce.h"
-#include "etcd_common.h"
-#include "etcd_watcher.h"
-#include "etcd_writer.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_discovery_impl.h"
-
-/* Discovery activator functions */
-celix_status_t pubsub_discovery_create(bundle_context_pt context, 
pubsub_discovery_pt *ps_discovery) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       *ps_discovery = calloc(1, sizeof(**ps_discovery));
-
-       if (*ps_discovery == NULL) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-               (*ps_discovery)->context = context;
-               (*ps_discovery)->discoveredPubs = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-               (*ps_discovery)->listenerReferences = 
hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
-               (*ps_discovery)->watchers = 
hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
-               
celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
-               celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, 
NULL);
-               celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
-
-       hash_map_iterator_pt iter = 
hashMapIterator_create(ps_discovery->discoveredPubs);
-
-       while (hashMapIterator_hasNext(iter)) {
-               array_list_pt pubEP_list = (array_list_pt) 
hashMapIterator_nextValue(iter);
-
-               for(int i=0; i < arrayList_size(pubEP_list); i++) {
-                       
pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
-               }
-               arrayList_destroy(pubEP_list);
-       }
-
-       hashMapIterator_destroy(iter);
-
-       hashMap_destroy(ps_discovery->discoveredPubs, true, false);
-       ps_discovery->discoveredPubs = NULL;
-
-       celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
-
-       celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
-
-
-       celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
-
-       hashMap_destroy(ps_discovery->listenerReferences, false, false);
-       ps_discovery->listenerReferences = NULL;
-
-       celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
-
-       celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
-
-       free(ps_discovery);
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-    status = etcdCommon_init(ps_discovery->context);
-    ps_discovery->writer = etcdWriter_create(ps_discovery);
-
-    return status;
-}
-
-celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    const char* fwUUID = NULL;
-
-    bundleContext_getProperty(ps_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-    if (fwUUID == NULL) {
-        printf("PSD: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-
-    celixThreadMutex_lock(&ps_discovery->watchersMutex);
-
-    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
-    while (hashMapIterator_hasNext(iter)) {
-        struct watcher_info * wi = hashMapIterator_nextValue(iter);
-        etcdWatcher_stop(wi->watcher);
-    }
-    hashMapIterator_destroy(iter);
-
-    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
-
-    /* Unexport all publishers for the local framework, and also delete from 
ETCD publisher belonging to the local framework */
-
-    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
-    while (hashMapIterator_hasNext(iter)) {
-        array_list_pt pubEP_list = (array_list_pt) 
hashMapIterator_nextValue(iter);
-
-        int i;
-        for (i = 0; i < arrayList_size(pubEP_list); i++) {
-            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) 
arrayList_get(pubEP_list, i);
-            if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
-                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, 
pubEP);
-            } else {
-                pubsub_discovery_informPublishersListeners(ps_discovery, 
pubEP, false);
-                arrayList_remove(pubEP_list, i);
-                pubsubEndpoint_destroy(pubEP);
-                i--;
-            }
-        }
-    }
-
-    hashMapIterator_destroy(iter);
-
-    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
-    etcdWriter_destroy(ps_discovery->writer);
-
-    iter = hashMapIterator_create(ps_discovery->watchers);
-    while (hashMapIterator_hasNext(iter)) {
-        struct watcher_info * wi = hashMapIterator_nextValue(iter);
-        etcdWatcher_destroy(wi->watcher);
-    }
-    hashMapIterator_destroy(iter);
-    hashMap_destroy(ps_discovery->watchers, true, true);
-    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
-    return status;
-}
-
-/* Functions called by the etcd_watcher */
-
-celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, 
pubsub_endpoint_pt pubEP) {
-       celix_status_t status = CELIX_SUCCESS;
-       bool inform=false;
-       celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-
-       char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-       array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
-       if(pubEP_list==NULL){
-               arrayList_create(&pubEP_list);
-               arrayList_add(pubEP_list,pubEP);
-               
hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
-               inform=true;
-       }
-       else{
-               int i;
-               bool found = false;
-               for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
-                       found = 
pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
-               }
-               if(found){
-                       pubsubEndpoint_destroy(pubEP);
-               }
-               else{
-                       arrayList_add(pubEP_list,pubEP);
-                       inform=true;
-       }
-       }
-       free(pubs_key);
-
-       celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-       if(inform){
-           status = 
pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt 
pubsub_discovery, pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-    pubsub_endpoint_pt p = NULL;
-    bool found = false;
-
-    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-    char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-    array_list_pt pubEP_list = (array_list_pt) 
hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
-    free(pubs_key);
-    if (pubEP_list == NULL) {
-        printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n", pubEP->topic);
-        status = CELIX_ILLEGAL_STATE;
-    } else {
-        int i;
-
-        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
-            p = arrayList_get(pubEP_list, i);
-            found = pubsubEndpoint_equals(pubEP, p);
-            if (found) {
-                arrayList_remove(pubEP_list, i);
-                pubsubEndpoint_destroy(p);
-            }
-        }
-    }
-
-    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-    if (found) {
-        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, 
pubEP, false);
-    }
-    pubsubEndpoint_destroy(pubEP);
-
-    return status;
-}
-
-/* Callback to the pubsub_topology_manager */
-celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt 
pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       // Inform listeners of new publisher endpoint
-       celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
-
-       if (pubsub_discovery->listenerReferences != NULL) {
-               hash_map_iterator_pt iter = 
hashMapIterator_create(pubsub_discovery->listenerReferences);
-               while (hashMapIterator_hasNext(iter)) {
-                       service_reference_pt reference = 
hashMapIterator_nextKey(iter);
-
-                       publisher_endpoint_announce_pt listener = NULL;
-
-                       bundleContext_getService(pubsub_discovery->context, 
reference, (void**) &listener);
-            if (epAdded) {
-                listener->announcePublisher(listener->handle, pubEP);
-            } else {
-                listener->removePublisher(listener->handle, pubEP);
-            }
-            bundleContext_ungetService(pubsub_discovery->context, reference, 
NULL);
-               }
-               hashMapIterator_destroy(iter);
-       }
-
-       celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
-
-       return status;
-}
-
-
-/* Service's functions implementation */
-celix_status_t pubsub_discovery_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
-       celix_status_t status = CELIX_SUCCESS;
-       printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, 
pubEP->endpoint);
-       pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-       celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
-       array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
-
-       if(pubEP_list==NULL){
-               arrayList_create(&pubEP_list);
-               
hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
-       }
-       free(pub_key);
-       pubsub_endpoint_pt p = NULL;
-       pubsubEndpoint_clone(pubEP, &p);
-
-       arrayList_add(pubEP_list,p);
-
-       status = 
etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
-
-       celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-       celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
-       array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
-       free(pub_key);
-       if(pubEP_list==NULL){
-               printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",pubEP->topic);
-               status = CELIX_ILLEGAL_STATE;
-       }
-       else{
-
-               int i;
-               bool found = false;
-               pubsub_endpoint_pt p = NULL;
-
-               for(i=0;!found && i<arrayList_size(pubEP_list);i++){
-                       p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       found = pubsubEndpoint_equals(pubEP,p);
-               }
-
-               if(!found){
-                       printf("PSD: Trying to remove a not existing endpoint. 
Something is not consistent.\n");
-                       status = CELIX_ILLEGAL_STATE;
-               }
-               else{
-
-                       arrayList_removeElement(pubEP_list,p);
-
-                       status = 
etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
-
-                       pubsubEndpoint_destroy(p);
-               }
-       }
-
-       celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* 
scope, const char* topic) {
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-    char *scope_topic_key = createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
-    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, 
scope_topic_key);
-    if(wi) {
-        wi->nr_references++;
-        free(scope_topic_key);
-    } else {
-        wi = calloc(1, sizeof(*wi));
-        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, 
topic, &wi->watcher);
-        wi->nr_references = 1;
-        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
-    }
-
-    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
-
-    return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* 
scope, const char* topic) {
-    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
-
-    char *scope_topic_key = createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
-
-    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, 
scope_topic_key);
-    if(entry) {
-        struct watcher_info * wi = hashMapEntry_getValue(entry);
-        wi->nr_references--;
-        if(wi->nr_references == 0) {
-            char *key = hashMapEntry_getKey(entry);
-            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
-            free(key);
-            free(scope_topic_key);
-            etcdWatcher_stop(wi->watcher);
-            etcdWatcher_destroy(wi->watcher);
-            free(wi);
-        }
-    } else {
-        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic 
%s\n", topic);
-    }
-    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
-    return CELIX_SUCCESS;
-}
-
-/* pubsub_topology_manager tracker callbacks */
-
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
-       publisher_endpoint_announce_pt listener = 
(publisher_endpoint_announce_pt)service;
-
-       celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-       celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
-
-       /* Notify the PSTM about discovered publisher endpoints */
-       hash_map_iterator_pt iter = 
hashMapIterator_create(pubsub_discovery->discoveredPubs);
-       while(hashMapIterator_hasNext(iter)){
-               array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               int i;
-               for(i=0;i<arrayList_size(pubEP_list);i++){
-                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       status += listener->announcePublisher(listener->handle, 
pubEP);
-               }
-       }
-
-       hashMapIterator_destroy(iter);
-
-       hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
-
-       celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
-       celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
-
-       printf("PSD: pubsub_tm_announce_publisher added.\n");
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, 
service);
-       if (status == CELIX_SUCCESS) {
-               status = pubsub_discovery_tmPublisherAnnounceAdded(handle, 
reference, service);
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_discovery_pt pubsub_discovery = handle;
-
-       celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
-
-       if (pubsub_discovery->listenerReferences != NULL) {
-               if (hashMap_remove(pubsub_discovery->listenerReferences, 
reference)) {
-                       printf("PSD: pubsub_tm_announce_publisher removed.\n");
-               }
-       }
-       celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
-
-       return status;
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/public/include/pubsub_discovery.h 
b/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
deleted file mode 100644
index f77905a..0000000
--- a/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBSUB_DISCOVERY_H_
-#define PUBSUB_DISCOVERY_H_
-
-typedef struct pubsub_discovery *pubsub_discovery_pt;
-
-
-#endif /* PUBSUB_DISCOVERY_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_common.c 
b/pubsub/pubsub_discovery/src/etcd_common.c
new file mode 100644
index 0000000..c757801
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_common.c
@@ -0,0 +1,82 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include <curl/curl.h>
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+#define MAX_ROOTNODE_LENGTH            128
+#define MAX_LOCALNODE_LENGTH   4096
+#define MAX_FIELD_LENGTH               128
+
+#define CFG_ETCD_SERVER_IP             "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT   "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+
+celix_status_t etcdCommon_init(bundle_context_pt context) {
+    celix_status_t status = CELIX_SUCCESS;
+    const char* etcd_server = NULL;
+    const char* etcd_port_string = NULL;
+    int etcd_port = 0;
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) 
!= CELIX_SUCCESS) || !etcd_server) {
+        etcd_server = DEFAULT_ETCD_SERVER_IP;
+    }
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, 
&etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
+        etcd_port = DEFAULT_ETCD_SERVER_PORT;
+    } else {
+        char* endptr = NULL;
+        errno = 0;
+        etcd_port = strtol(etcd_port_string, &endptr, 10);
+        if (*endptr || errno != 0) {
+            etcd_port = DEFAULT_ETCD_SERVER_PORT;
+        }
+    }
+
+    printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
+
+    if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        status = CELIX_SUCCESS;
+    }
+
+    return status;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_common.h 
b/pubsub/pubsub_discovery/src/etcd_common.h
new file mode 100644
index 0000000..7a3e7b6
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_common.h
@@ -0,0 +1,28 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_COMMON_H_
+#define ETCD_COMMON_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+celix_status_t etcdCommon_init(bundle_context_pt context);
+
+#endif /* ETCD_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.c 
b/pubsub/pubsub_discovery/src/etcd_watcher.c
new file mode 100644
index 0000000..3c3a5a8
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.c
@@ -0,0 +1,290 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+
+#define MAX_ROOTNODE_LENGTH             128
+#define MAX_LOCALNODE_LENGTH            4096
+#define MAX_FIELD_LENGTH                128
+
+#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT        2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL                30
+
+
+struct etcd_watcher {
+       pubsub_discovery_pt pubsub_discovery;
+
+       celix_thread_mutex_t watcherLock;
+       celix_thread_t watcherThread;
+
+       char *scope;
+       char *topic;
+       volatile bool running;
+};
+
+struct etcd_writer {
+       pubsub_discovery_pt pubsub_discovery;
+       celix_thread_mutex_t localPubsLock;
+       array_list_pt localPubs;
+       volatile bool running;
+       celix_thread_t writerThread;
+};
+
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, 
const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
+       celix_status_t status = CELIX_SUCCESS;
+       const char* rootPath = NULL;
+
+       if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+               snprintf(rootNode, rootNodeLen, "%s/%s/%s", 
DEFAULT_ETCD_ROOTPATH, scope, topic);
+       } else {
+               snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, 
topic);
+       }
+
+       return status;
+}
+
+static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* 
rootNode) {
+       celix_status_t status = CELIX_SUCCESS;
+       const char* rootPath = NULL;
+
+       if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+               strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+       } else {
+               strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+       }
+
+       return status;
+}
+
+
+static void add_node(const char *key, const char *value, void* arg) {
+       pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+       pubsub_endpoint_pt pubEP = NULL;
+       celix_status_t status = 
etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+       if(!status && pubEP) {
+               pubsub_discovery_addNode(ps_discovery, pubEP);
+       }
+}
+
+static celix_status_t 
etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, 
const char *rootPath, long long * highestModified) {
+       celix_status_t status = CELIX_SUCCESS;
+       if(etcd_get_directory(rootPath, add_node, ps_discovery, 
highestModified)) {
+               status = CELIX_ILLEGAL_ARGUMENT;
+       }
+       return status;
+}
+
+// gets everything from provided key
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt 
pubsub_discovery, const char* etcdKey, const char* etcdValue, 
pubsub_endpoint_pt* pubEP) {
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       char *expr = NULL;
+       char scope[MAX_FIELD_LENGTH];
+       char topic[MAX_FIELD_LENGTH];
+       char fwUUID[MAX_FIELD_LENGTH];
+       char serviceId[MAX_FIELD_LENGTH];
+
+       memset(rootPath,0,MAX_ROOTNODE_LENGTH);
+       memset(topic,0,MAX_FIELD_LENGTH);
+       memset(fwUUID,0,MAX_FIELD_LENGTH);
+       memset(serviceId,0,MAX_FIELD_LENGTH);
+
+       etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
+
+       asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
+       if(expr) {
+               int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
+               free(expr);
+               if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
+                       status = CELIX_ILLEGAL_STATE;
+               }
+               else{
+                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
+               }
+       }
+       return status;
+}
+
+/*
+ * performs (blocking) etcd_watch calls to check for
+ * changing discovery endpoint information within etcd.
+ */
+static void* etcdWatcher_run(void* data) {
+       etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+       time_t timeBeforeWatch = time(NULL);
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       long long highestModified = 0;
+
+       pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+       bundle_context_pt context = ps_discovery->context;
+
+       memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+       //TODO: add topic to etcd key
+       etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, 
rootPath, MAX_ROOTNODE_LENGTH);
+       etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, 
&highestModified);
+
+       while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) 
&& watcher->running) {
+
+               char *rkey = NULL;
+               char *value = NULL;
+               char *preValue = NULL;
+               char *action = NULL;
+               long long modIndex;
+
+               celixThreadMutex_unlock(&watcher->watcherLock);
+
+               if (etcd_watch(rootPath, highestModified + 1, &action, 
&preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+                       pubsub_endpoint_pt pubEP = NULL;
+                       if ((strcmp(action, "set") == 0) || (strcmp(action, 
"create") == 0)) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
+                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
+                               }
+                       } else if (strcmp(action, "delete") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
+                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
+                               }
+                       } else if (strcmp(action, "expire") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
+                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
+                               }
+                       } else if (strcmp(action, "update") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
+                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
+                               }
+                       } else {
+                               fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, 
"Unexpected action: %s", action);
+                       }
+                       highestModified = modIndex;
+               } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 
4)) {
+                       sleep(DEFAULT_ETCD_TTL / 4);
+               }
+
+               FREE_MEM(action);
+               FREE_MEM(value);
+               FREE_MEM(preValue);
+               FREE_MEM(rkey);
+
+               /* prevent busy waiting, in case etcd_watch returns false */
+
+
+               if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+                       timeBeforeWatch = time(NULL);
+               }
+
+       }
+
+       if (watcher->running == false) {
+               celixThreadMutex_unlock(&watcher->watcherLock);
+       }
+
+       return NULL;
+}
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, 
bundle_context_pt context, const char *scope, const char *topic, 
etcd_watcher_pt *watcher) {
+       celix_status_t status = CELIX_SUCCESS;
+
+
+       if (pubsub_discovery == NULL) {
+               return CELIX_BUNDLE_EXCEPTION;
+       }
+
+       (*watcher) = calloc(1, sizeof(struct etcd_watcher));
+
+       if(*watcher == NULL){
+               return CELIX_ENOMEM;
+       }
+
+       (*watcher)->pubsub_discovery = pubsub_discovery;
+       (*watcher)->scope = strdup(scope);
+       (*watcher)->topic = strdup(topic);
+
+
+       celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+       celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+       status = celixThread_create(&(*watcher)->watcherThread, NULL, 
etcdWatcher_run, *watcher);
+       if (status == CELIX_SUCCESS) {
+               (*watcher)->running = true;
+       }
+
+       celixThreadMutex_unlock(&(*watcher)->watcherLock);
+
+
+       return status;
+}
+
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, 
watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+       celixThreadMutex_destroy(&(watcher->watcherLock));
+
+       free(watcher->scope);
+       free(watcher->topic);
+       free(watcher);
+
+       return status;
+}
+
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&(watcher->watcherLock));
+       watcher->running = false;
+       celixThreadMutex_unlock(&(watcher->watcherLock));
+
+       celixThread_join(watcher->watcherThread, NULL);
+
+       return status;
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.h 
b/pubsub/pubsub_discovery/src/etcd_watcher.h
new file mode 100644
index 0000000..c425e60
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.h
@@ -0,0 +1,38 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WATCHER_H_
+#define ETCD_WATCHER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_watcher *etcd_watcher_pt;
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery,  
bundle_context_pt context, const char *scope, const char* topic, 
etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
+
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt 
discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
+
+
+#endif /* ETCD_WATCHER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_writer.c 
b/pubsub/pubsub_discovery/src/etcd_writer.c
new file mode 100644
index 0000000..1c423f3
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_writer.c
@@ -0,0 +1,189 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_writer.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+#define MAX_ROOTNODE_LENGTH            128
+
+#define CFG_ETCD_ROOT_PATH             "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH  "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP             "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT   "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+struct etcd_writer {
+       pubsub_discovery_pt pubsub_discovery;
+       celix_thread_mutex_t localPubsLock;
+       array_list_pt localPubs;
+       volatile bool running;
+       celix_thread_t writerThread;
+};
+
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context);
+static void* etcdWriter_run(void* data);
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
+       etcd_writer_pt writer = calloc(1, sizeof(*writer));
+       if(writer) {
+               celixThreadMutex_create(&writer->localPubsLock, NULL);
+               arrayList_create(&writer->localPubs);
+               writer->pubsub_discovery = disc;
+               writer->running = true;
+               celixThread_create(&writer->writerThread, NULL, etcdWriter_run, 
writer);
+       }
+       return writer;
+}
+
+void etcdWriter_destroy(etcd_writer_pt writer) {
+       char dir[MAX_ROOTNODE_LENGTH];
+       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+       writer->running = false;
+       celixThread_join(writer->writerThread, NULL);
+
+       celixThreadMutex_lock(&writer->localPubsLock);
+       for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+               memset(dir,0,MAX_ROOTNODE_LENGTH);
+               
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+               etcd_del(dir);
+               pubsubEndpoint_destroy(pubEP);
+       }
+       arrayList_destroy(writer->localPubs);
+
+       celixThreadMutex_unlock(&writer->localPubsLock);
+       celixThreadMutex_destroy(&(writer->localPubsLock));
+
+       free(writer);
+}
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP, bool storeEP){
+       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+       if(storeEP){
+               const char *fwUUID = NULL;
+               bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+               if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+                       celixThreadMutex_lock(&writer->localPubsLock);
+                       pubsub_endpoint_pt p = NULL;
+                       pubsubEndpoint_clone(pubEP, &p);
+                       arrayList_add(writer->localPubs,p);
+                       celixThreadMutex_unlock(&writer->localPubsLock);
+               }
+       }
+
+       char *key;
+
+       const char* ttlStr = NULL;
+       int ttl = 0;
+
+       // determine ttl
+       if ((bundleContext_getProperty(writer->pubsub_discovery->context, 
CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
+               ttl = DEFAULT_ETCD_TTL;
+       } else {
+               char* endptr = NULL;
+               errno = 0;
+               ttl = strtol(ttlStr, &endptr, 10);
+               if (*endptr || errno != 0) {
+                       ttl = DEFAULT_ETCD_TTL;
+               }
+       }
+
+       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+       
asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
+
+       if(!etcd_set(key,pubEP->endpoint,ttl,false)){
+               status = CELIX_ILLEGAL_ARGUMENT;
+       }
+       FREE_MEM(key);
+       return status;
+}
+
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *key = NULL;
+
+       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+       asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
+
+       celixThreadMutex_lock(&writer->localPubsLock);
+       for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+               pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+               if (pubsubEndpoint_equals(ep, pubEP)) {
+                       arrayList_remove(writer->localPubs, i);
+                       pubsubEndpoint_destroy(ep);
+                       break;
+               }
+       }
+       celixThreadMutex_unlock(&writer->localPubsLock);
+
+       if (etcd_del(key)) {
+               printf("Failed to remove key %s from ETCD\n",key);
+               status = CELIX_ILLEGAL_ARGUMENT;
+       }
+       FREE_MEM(key);
+       return status;
+}
+
+static void* etcdWriter_run(void* data) {
+       etcd_writer_pt writer = (etcd_writer_pt)data;
+       while(writer->running) {
+               celixThreadMutex_lock(&writer->localPubsLock);
+               for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+                       
etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+               }
+               celixThreadMutex_unlock(&writer->localPubsLock);
+               sleep(DEFAULT_ETCD_TTL / 2);
+       }
+
+       return NULL;
+}
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context) {
+       const char* rootPath = NULL;
+       bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+       if(rootPath == NULL) {
+               rootPath = DEFAULT_ETCD_ROOTPATH;
+       }
+       return rootPath;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/etcd_writer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_writer.h 
b/pubsub/pubsub_discovery/src/etcd_writer.h
new file mode 100644
index 0000000..3ff98b9
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/etcd_writer.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WRITER_H_
+#define ETCD_WRITER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_writer *etcd_writer_pt;
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
+void etcdWriter_destroy(etcd_writer_pt writer);
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP,bool storeEP);
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP);
+
+
+#endif /* ETCD_WRITER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/psd_activator.c 
b/pubsub/pubsub_discovery/src/psd_activator.c
new file mode 100644
index 0000000..89a517d
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/psd_activator.c
@@ -0,0 +1,171 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "publisher_endpoint_announce.h"
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+struct activator {
+       bundle_context_pt context;
+       pubsub_discovery_pt pubsub_discovery;
+
+       service_tracker_pt pstmPublishersTracker;
+
+       publisher_endpoint_announce_pt publisherEPAnnounce;
+       service_registration_pt publisherEPAnnounceService;
+};
+
+static celix_status_t createTMPublisherAnnounceTracker(struct activator 
*activator, service_tracker_pt *tracker) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       service_tracker_customizer_pt customizer = NULL;
+
+       status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
+                       NULL,
+                       pubsub_discovery_tmPublisherAnnounceAdded,
+                       pubsub_discovery_tmPublisherAnnounceModified,
+                       pubsub_discovery_tmPublisherAnnounceRemoved,
+                       &customizer);
+
+       if (status == CELIX_SUCCESS) {
+               status = serviceTracker_create(activator->context, (char *) 
PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
+       }
+
+       return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       struct activator* activator = calloc(1, sizeof(*activator));
+
+       if (activator) {
+               activator->context = context;
+               activator->pstmPublishersTracker = NULL;
+               activator->publisherEPAnnounce = NULL;
+               activator->publisherEPAnnounceService = NULL;
+
+               status = pubsub_discovery_create(context, 
&activator->pubsub_discovery);
+
+               if (status == CELIX_SUCCESS) {
+                       status = createTMPublisherAnnounceTracker(activator, 
&(activator->pstmPublishersTracker));
+               }
+
+               if (status == CELIX_SUCCESS) {
+                       *userData = activator;
+               } else {
+                       free(activator);
+               }
+       } else {
+               status = CELIX_ENOMEM;
+       }
+
+       return status;
+
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       struct activator *activator = userData;
+
+       publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, 
sizeof(*pubEPAnnouncer));
+
+       if (pubEPAnnouncer) {
+
+               pubEPAnnouncer->handle = activator->pubsub_discovery;
+               pubEPAnnouncer->announcePublisher = 
pubsub_discovery_announcePublisher;
+               pubEPAnnouncer->removePublisher = 
pubsub_discovery_removePublisher;
+               pubEPAnnouncer->interestedInTopic = 
pubsub_discovery_interestedInTopic;
+               pubEPAnnouncer->uninterestedInTopic = 
pubsub_discovery_uninterestedInTopic;
+               activator->publisherEPAnnounce = pubEPAnnouncer;
+
+               properties_pt props = properties_create();
+               properties_set(props, "PUBSUB_DISCOVERY", "true");
+
+               // pubsub_discovery_start needs to be first to initalize the 
propert etcd_watcher values
+               status = pubsub_discovery_start(activator->pubsub_discovery);
+
+               if (status == CELIX_SUCCESS) {
+                       status = 
serviceTracker_open(activator->pstmPublishersTracker);
+               }
+
+               if (status == CELIX_SUCCESS) {
+                       status = bundleContext_registerService(context, (char 
*) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, 
&activator->publisherEPAnnounceService);
+               }
+
+
+       }
+       else{
+               status = CELIX_ENOMEM;
+       }
+
+       if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
+               free(pubEPAnnouncer);
+       }
+
+
+       return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       status += pubsub_discovery_stop(activator->pubsub_discovery);
+
+       status += serviceTracker_close(activator->pstmPublishersTracker);
+
+       status += 
serviceRegistration_unregister(activator->publisherEPAnnounceService);
+
+       if (status == CELIX_SUCCESS) {
+               free(activator->publisherEPAnnounce);
+       }
+
+       return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       status += serviceTracker_destroy(activator->pstmPublishersTracker);
+       status += pubsub_discovery_destroy(activator->pubsub_discovery);
+
+       activator->publisherEPAnnounce = NULL;
+       activator->publisherEPAnnounceService = NULL;
+       activator->pstmPublishersTracker = NULL;
+       activator->pubsub_discovery = NULL;
+       activator->context = NULL;
+
+       free(activator);
+
+       return status;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_discovery/src/pubsub_discovery.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery.h 
b/pubsub/pubsub_discovery/src/pubsub_discovery.h
new file mode 100644
index 0000000..f77905a
--- /dev/null
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery.h
@@ -0,0 +1,26 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_H_
+#define PUBSUB_DISCOVERY_H_
+
+typedef struct pubsub_discovery *pubsub_discovery_pt;
+
+
+#endif /* PUBSUB_DISCOVERY_H_ */

Reply via email to