http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
new file mode 100644
index 0000000..38a9127
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+#ifndef CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
+#define CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t;
+
+pubsub_updmc_topic_sender_t* 
pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, /*TODO rest args*/ 
const char *scope, const char *topic, long serializerSvcId);
+void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender);
+
+const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t 
*sender);
+const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t 
*sender);
+const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender);
+const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t 
*sender);
+//TODO connections etc
+
+void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, 
const celix_properties_t *endpoint);
+void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t 
*sender, const celix_properties_t *endpoint);
+
+#endif //CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index 046feb6..eea8460 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -98,12 +98,12 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out){
 
        char* ep = malloc(EP_ADDRESS_LEN);
        memset(ep,0,EP_ADDRESS_LEN);
 
-       long serviceId = celix_properties_getAsLong(pubEP->properties, 
OSGI_FRAMEWORK_SERVICE_ID, 0);
+       long serviceId = celix_properties_getAsLong(pubEP, 
OSGI_FRAMEWORK_SERVICE_ID, 0);
 
        unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, 
UDP_MAX_PORT);
        snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
@@ -169,7 +169,7 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 
        /* Let's register the new service */
 
-       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+       celix_properties_t *pubEP = arrayList_get(pub->pub_ep_list,0);
 
        if(pubEP!=NULL){
                service_factory_pt factory = calloc(1, sizeof(*factory));
@@ -178,8 +178,8 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
                factory->ungetService = pubsub_topicPublicationUngetService;
 
                properties_pt props = properties_create();
-               
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
-               
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+               
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
+               
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                 properties_set(props,"service.version", 
PUBSUB_PUBLISHER_SERVICE_VERSION);
 
 
@@ -188,8 +188,8 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
                if(status != CELIX_SUCCESS){
                        properties_destroy(props);
                        printf("[PSA UDPMC] Cannot register ServiceFactory for 
topic %s, topic %s.\n",
-                                  properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                                  properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+                                  properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                                  properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                } else {
                        *svcFactory = factory;
                }
@@ -206,19 +206,19 @@ celix_status_t 
pubsub_topicPublicationStop(topic_publication_pt pub){
        return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
pubsub_endpoint_pt ep) {
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep) {
 
        celixThreadMutex_lock(&(pub->tp_lock));
-       pubsubEndpoint_setField(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, 
pub->endpoint);
-       pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, 
PSA_UDPMC_PUBSUB_ADMIN_TYPE);
-       pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, 
pub->serializer.type);
+       celix_properties_set(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, 
pub->endpoint);
+       celix_properties_set(ep, PUBSUB_ADMIN_TYPE_KEY, 
PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+       celix_properties_set(ep, PUBSUB_SERIALIZER_TYPE_KEY, 
pub->serializer.type);
        arrayList_add(pub->pub_ep_list,ep);
        celixThreadMutex_unlock(&(pub->tp_lock));
 
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep) {
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,celix_properties_t *ep) {
 
        celixThreadMutex_lock(&(pub->tp_lock));
        arrayList_removeElement(pub->pub_ep_list,ep);
@@ -401,9 +401,9 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                        
tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
                }
 
-               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-               bound->scope=strdup(properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
-               bound->topic=strdup(properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+               celix_properties_t *pubEP = 
arrayList_get(bound->parent->pub_ep_list,0);
+               bound->scope=strdup(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
+               bound->topic=strdup(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                bound->largeUdpHandle = largeUdp_create(1);
 
                bound->service.handle = bound;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
index e0a5698..0dc89fb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
@@ -43,11 +43,11 @@ typedef struct pubsub_udp_msg {
 } pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub, celix_properties_t *ep);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
index 82adf24..e08f219 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
@@ -391,7 +391,7 @@ celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
        return status;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP){
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
celix_properties_t *subEP){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
@@ -412,7 +412,7 @@ celix_status_t 
pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
        return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP){
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, celix_properties_t *subEP){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
index 475416a..c79e88c 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
@@ -49,8 +49,8 @@ celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
 
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
celix_properties_t *subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, celix_properties_t *subEP);
 
 array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 008dff5..3eb887b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -16,171 +16,142 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * psa_activator.c
+
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
  *
- *  \date       Sep 30, 2011
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
  */
 
+
 #include <stdlib.h>
 
-#include "bundle_activator.h"
-#include "service_tracker.h"
+#include "celix_api.h"
+#include "pubsub_serializer.h"
 
 #include "pubsub_admin_impl.h"
 
+typedef struct psa_zmq_activator {
+       pubsub_admin_t *admin;
 
-struct activator {
-       pubsub_admin_pt admin;
-       pubsub_admin_service_pt adminService;
-       service_registration_pt registration;
-       service_tracker_pt serializerTracker;
-};
+       pubsub_admin_service_t adminService;
+       long adminSvcId;
 
+       command_service_t cmdSvc;
+       long cmdSvcId;
 
-static celix_status_t shellCommand(void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
-    struct activator *act= (struct activator*)handle;
-    if (act->admin->externalPublications && 
!hashMap_isEmpty(act->admin->externalPublications)) {
-        fprintf(outStream, "External Publications:\n");
-        for(hash_map_iterator_t iter = 
hashMapIterator_construct(act->admin->externalPublications); 
hashMapIterator_hasNext(&iter);) {
-            const char* key = (const char*)hashMapIterator_nextKey(&iter);
-            fprintf(outStream, "    %s\n", key);
-        }
-    }
-    if (act->admin->localPublications && 
!hashMap_isEmpty(act->admin->localPublications)) {
-        fprintf(outStream, "Local Publications:\n");
-        for (hash_map_iterator_t iter = hashMapIterator_construct(
-                act->admin->localPublications); 
hashMapIterator_hasNext(&iter);) {
-            const char *key = (const char *) hashMapIterator_nextKey(&iter);
-            fprintf(outStream, "    %s\n", key);
-        }
-    }
-    if (act->admin->subscriptions && 
!hashMap_isEmpty(act->admin->subscriptions)) {
-        fprintf(outStream, "Active Subscriptions:\n");
-        for (hash_map_iterator_t iter = hashMapIterator_construct(
-                act->admin->subscriptions); hashMapIterator_hasNext(&iter);) {
-            const char *key = (const char *) hashMapIterator_nextKey(&iter);
-            fprintf(outStream, "    %s\n", key);
-        }
-    }
-    if (act->admin->pendingSubscriptions && 
!hashMap_isEmpty(act->admin->pendingSubscriptions)) {
-        fprintf(outStream, "Pending Subscriptions:\n");
-        for (hash_map_iterator_t iter = hashMapIterator_construct(
-                act->admin->pendingSubscriptions); 
hashMapIterator_hasNext(&iter);) {
-            const char *key = (const char *) hashMapIterator_nextKey(&iter);
-            fprintf(outStream, "    %s\n", key);
-        }
-    }
-    return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator;
+       long serializerTrackerId;
+} psa_zmq_activator_t;
 
-       activator = calloc(1, sizeof(*activator));
-       if (!activator) {
-               status = CELIX_ENOMEM;
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
+       psa_zmq_activator_t *act= handle;
+       if (act->admin->externalPublications && 
!hashMap_isEmpty(act->admin->externalPublications)) {
+               fprintf(outStream, "External Publications:\n");
+               for(hash_map_iterator_t iter = 
hashMapIterator_construct(act->admin->externalPublications); 
hashMapIterator_hasNext(&iter);) {
+                       const char* key = (const 
char*)hashMapIterator_nextKey(&iter);
+                       fprintf(outStream, "    %s\n", key);
+               }
        }
-       else{
-               *userData = activator;
-
-               status = pubsubAdmin_create(context, &(activator->admin));
-
-               if(status == CELIX_SUCCESS){
-                       service_tracker_customizer_pt customizer = NULL;
-                       status = 
serviceTrackerCustomizer_create(activator->admin,
-                                       NULL,
-                                       pubsubAdmin_serializerAdded,
-                                       NULL,
-                                       pubsubAdmin_serializerRemoved,
-                                       &customizer);
-                       if(status == CELIX_SUCCESS){
-                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
-                if (status == CELIX_SUCCESS) {
-                    properties_pt shellProps = properties_create();
-                    properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, 
"psa_zmq_info");
-                    properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, 
"psa_zmq_info");
-                    properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, 
"psa_zmq_info: Overview of PubSub ZMQ Admin");
-                    activator->admin->shellCmdService.handle = activator;
-                    activator->admin->shellCmdService.executeCommand = 
shellCommand;
-                    bundleContext_registerService(context, 
OSGI_SHELL_COMMAND_SERVICE_NAME, &activator->admin->shellCmdService, 
shellProps, &activator->admin->shellCmdReg);
-                } else {
-                                       
serviceTrackerCustomizer_destroy(customizer);
-                                       pubsubAdmin_destroy(activator->admin);
-                               }
-                       }
-                       else{
-                               pubsubAdmin_destroy(activator->admin);
-                       }
+       if (act->admin->localPublications && 
!hashMap_isEmpty(act->admin->localPublications)) {
+               fprintf(outStream, "Local Publications:\n");
+               for (hash_map_iterator_t iter = hashMapIterator_construct(
+                               act->admin->localPublications); 
hashMapIterator_hasNext(&iter);) {
+                       const char *key = (const char *) 
hashMapIterator_nextKey(&iter);
+                       fprintf(outStream, "    %s\n", key);
                }
        }
-       return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-       pubsub_admin_service_pt pubsubAdminSvc = calloc(1, 
sizeof(*pubsubAdminSvc));
-
-       if (!pubsubAdminSvc) {
-               status = CELIX_ENOMEM;
+       if (act->admin->subscriptions && 
!hashMap_isEmpty(act->admin->subscriptions)) {
+               fprintf(outStream, "Active Subscriptions:\n");
+               for (hash_map_iterator_t iter = hashMapIterator_construct(
+                               act->admin->subscriptions); 
hashMapIterator_hasNext(&iter);) {
+                       const char *key = (const char *) 
hashMapIterator_nextKey(&iter);
+                       fprintf(outStream, "    %s\n", key);
+               }
        }
-       else{
-               pubsubAdminSvc->admin = activator->admin;
-
-               pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
-               pubsubAdminSvc->removePublication = 
pubsubAdmin_removePublication;
-
-               pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
-               pubsubAdminSvc->removeSubscription = 
pubsubAdmin_removeSubscription;
-
-               pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
-               pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
-
-               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+       if (act->admin->pendingSubscriptions && 
!hashMap_isEmpty(act->admin->pendingSubscriptions)) {
+               fprintf(outStream, "Pending Subscriptions:\n");
+               for (hash_map_iterator_t iter = hashMapIterator_construct(
+                               act->admin->pendingSubscriptions); 
hashMapIterator_hasNext(&iter);) {
+                       const char *key = (const char *) 
hashMapIterator_nextKey(&iter);
+                       fprintf(outStream, "    %s\n", key);
+               }
+       }
+       return CELIX_SUCCESS;
+}
 
-               activator->adminService = pubsubAdminSvc;
+int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
+       act->adminSvcId = -1L;
+       act->serializerTrackerId = -1l;
 
-               status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+       celix_status_t status = pubsubAdmin_create(ctx, &(act->admin));
 
-               status += serviceTracker_open(activator->serializerTracker);
+       //track pubsub serializers
+       if (status == CELIX_SUCCESS) {
+               celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+               opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+               opts.filter.ignoreServiceLanguage = true;
 
+               opts.callbackHandle = act->admin;
+               opts.addWithProperties = pubsubAdmin_addSerializer;
+               opts.removeWithProperties = pubsubAdmin_removeSerializer;
+               act->serializerTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
        }
 
+       //register psa service
+       if (status == CELIX_SUCCESS) {
+               pubsub_admin_service_t *psaSvc = &act->adminService;
+               psaSvc->handle = act->admin;
+               psaSvc->matchPublisher = pubsubAdmin_matchPublisher;
+               psaSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
+               psaSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+               psaSvc->setupTopicSender = pubsubAdmin_setupTopicSender;
+               psaSvc->teardownTopicSender = pubsubAdmin_teardownTopicSender;
+               psaSvc->setupTopicReciever = pubsubAdmin_setupTopicReciever;
+               psaSvc->teardownTopicReciever = 
pubsubAdmin_teardownTopicReciever;
+               psaSvc->addEndpoint = pubsubAdmin_addEndpoint;
+               psaSvc->removeEndpoint = pubsubAdmin_removeEndpoint;
+
+               celix_properties_t *props = celix_properties_create();
+               celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, 
PUBSUB_PSA_ZMQ_PSA_TYPE);
+
+               act->adminSvcId = celix_bundleContext_registerService(ctx, 
psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+       }
 
-       return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       status += serviceTracker_close(activator->serializerTracker);
-       serviceRegistration_unregister(activator->admin->shellCmdReg);
-       activator->admin->shellCmdReg = NULL;
-       status += serviceRegistration_unregister(activator->registration);
+       //register psa shell command
+       if (status == CELIX_SUCCESS) {
+               act->cmdSvc.handle = act;
+               act->cmdSvc.executeCommand = shellCommand;
 
-       activator->registration = NULL;
+               celix_properties_t *shellProps = celix_properties_create();
+               celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, 
"psa_zmq_info");
+               celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, 
"psa_zmq_info");
+               celix_properties_set(shellProps, 
OSGI_SHELL_COMMAND_DESCRIPTION, "psa_zmq_info: Overview of PubSub ZMQ Admin");
 
-       free(activator->adminService);
-       activator->adminService = NULL;
+               act->cmdSvcId = celix_bundleContext_registerService(ctx, 
&act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
+       }
 
        return status;
 }
 
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       serviceTracker_destroy(activator->serializerTracker);
-       pubsubAdmin_destroy(activator->admin);
-       activator->admin = NULL;
-       free(activator);
-
-       return status;
+int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
+       celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+       celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+       celix_bundleContext_stopTracker(ctx, act->serializerTrackerId);
+       pubsubAdmin_destroy(act->admin);
+       return CELIX_SUCCESS;
 }
 
-
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_zmq_activator_t, psa_zmq_start, psa_zmq_stop);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c 
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 0af73e4..34cc6f9 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -419,83 +419,107 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
         return pubsubAdmin_addAnySubscription(admin,subEP);
     }
 
-    /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
-    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-    celixThreadMutex_lock(&admin->localPublicationsLock);
-    celixThreadMutex_lock(&admin->externalPublicationsLock);
-
-    char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+    pubsub_serializer_service_t *best_serializer = NULL;
+    const char *serType = NULL;
+    status = pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, 
&serType);
 
-    service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-    array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 
-    if(factory==NULL && ext_pub_list==NULL){ //No (local or external) 
publishers yet for this topic
-        pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+    if (status == CELIX_SUCCESS) {
+        //admin type and ser type now known -> update ep
+        pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, 
PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+        pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType);
+    } else {
+        printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. 
Adding it to pending list.\n",
+               properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
+        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+        arrayList_add(admin->noSerializerSubscriptions,subEP);
+        celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
     }
-    else{
-        int i;
-        topic_subscription_pt subscription = hashMap_get(admin->subscriptions, 
scope_topic);
 
-        if(subscription == NULL) {
-            pubsub_serializer_service_t *best_serializer = NULL;
-            const char *serType = NULL;
-            if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
-                status += 
pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) 
properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, 
serType, &subscription);
-            }
-            else{
-                printf("PSA_ZMQ: Cannot find a serializer for subscribing 
topic %s. Adding it to pending list.\n",
-                        properties_get(subEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                arrayList_add(admin->noSerializerSubscriptions,subEP);
-                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-            }
+    char *scope_topic = NULL;
+    if (status == CELIX_SUCCESS) {
+        /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
+        celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+        celixThreadMutex_lock(&admin->subscriptionsLock);
+        celixThreadMutex_lock(&admin->localPublicationsLock);
+        celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-            if (status==CELIX_SUCCESS){
+        scope_topic = pubsubEndpoint_createScopeTopicKey(
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-                /* Try to connect internal publishers */
-                if(factory!=NULL){
-                    topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                    array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+        service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
+        array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
 
-                    if(topic_publishers!=NULL){
-                        for(i=0;i<arrayList_size(topic_publishers);i++){
-                            pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                            if(properties_get(pubEP->properties, 
PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
-                                status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->properties,
 PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
+        if (factory == NULL && ext_pub_list == NULL) { //No (local or 
external) publishers yet for this topic
+            pubsubAdmin_addSubscriptionToPendingList(admin, subEP);
+        } else {
+            int i;
+            topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
+
+            if (subscription == NULL) {
+                status += pubsub_topicSubscriptionCreate(admin->bundle_context,
+                                                         (char *) 
properties_get(subEP->properties,
+                                                                               
  PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                                                         (char *) 
properties_get(subEP->properties,
+                                                                               
  PUBSUB_ENDPOINT_TOPIC_NAME),
+                                                         best_serializer, 
serType, &subscription);
+
+                if (status == CELIX_SUCCESS) {
+                    //got type and serializer -> update endpoint
+                    pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, 
PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+                    pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, 
serType);
+
+                    /* Try to connect internal publishers */
+                    if (factory != NULL) {
+                        topic_publication_pt topic_pubs = 
(topic_publication_pt) factory->handle;
+                        array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                        if (topic_publishers != NULL) {
+                            for (i = 0; i < arrayList_size(topic_publishers); 
i++) {
+                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt) arrayList_get(topic_publishers, i);
+                                if (properties_get(pubEP->properties, 
PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
+                                    status += 
pubsub_topicSubscriptionConnectPublisher(subscription,
+                                                                               
        (char *) properties_get(
+                                                                               
                pubEP->properties,
+                                                                               
                PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
+                                }
                             }
+                            arrayList_destroy(topic_publishers);
                         }
-                        arrayList_destroy(topic_publishers);
-                    }
 
-                }
+                    }
 
-                /* Look also for external publishers */
-                if(ext_pub_list!=NULL){
-                    for(i=0;i<arrayList_size(ext_pub_list);i++){
-                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                        if(properties_get(pubEP->properties, 
PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){
-                            status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*) 
properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
+                    /* Look also for external publishers */
+                    if (ext_pub_list != NULL) {
+                        for (i = 0; i < arrayList_size(ext_pub_list); i++) {
+                            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) 
arrayList_get(ext_pub_list, i);
+                            if (properties_get(pubEP->properties, 
PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) {
+                                status += 
pubsub_topicSubscriptionConnectPublisher(subscription,
+                                                                               
    (char *) properties_get(
+                                                                               
            pubEP->properties,
+                                                                               
            PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY));
+                            }
                         }
                     }
-                }
 
-                pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+                    pubsub_topicSubscriptionAddSubscriber(subscription, subEP);
 
-                status += pubsub_topicSubscriptionStart(subscription);
+                    status += pubsub_topicSubscriptionStart(subscription);
 
-            }
+                }
 
-            if(status==CELIX_SUCCESS){
+                if (status == CELIX_SUCCESS) {
 
-                
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+                    hashMap_put(admin->subscriptions, strdup(scope_topic), 
subscription);
 
-                connectTopicPubSubToSerializer(admin, best_serializer, 
subscription, false);
+                    connectTopicPubSubToSerializer(admin, best_serializer, 
subscription, false);
+                }
             }
-        }
 
-        if (status == CELIX_SUCCESS){
-            pubsub_topicIncreaseNrSubscribers(subscription);
+            if (status == CELIX_SUCCESS) {
+                pubsub_topicIncreaseNrSubscribers(subscription);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h 
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
index 4569a97..e8f80b1 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -39,11 +39,13 @@
 
 #include "pubsub_psa_zmq_constants.h"
 #include "pubsub_admin.h"
-#include "pubsub_admin_match.h"
+#include "pubsub_utils.h"
 #include "log_helper.h"
 #include "command.h"
 
-struct pubsub_admin {
+#define PUBSUB_PSA_ZMQ_PSA_TYPE                "zmq"
+
+typedef struct pubsub_admin {
 
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
@@ -82,31 +84,37 @@ struct pubsub_admin {
     unsigned int basePort;
     unsigned int maxPort;
 
-       command_service_t shellCmdService;
-       service_registration_pt  shellCmdReg;
-
        double qosSampleScore;
        double qosControlScore;
        double defaultScore;
 
        bool verbose;
-};
+} pubsub_admin_t;
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t 
**admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin);
+
+void pubsubAdmin_addSerializer(void * handle, void *svc, const 
celix_properties_t *properties);
+void pubsubAdmin_removeSerializer(void * handle, void *svc, const 
celix_properties_t *properties);
+
+
 
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+//for the pubsub_admin_service
 
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long 
*serializerSvcId);
+celix_status_t pubsubAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, double *score, long 
*serializerSvcId);
+celix_status_t pubsubAdmin_matchEndpoint(void *handle, const 
celix_properties_t *endpoint, double *score);
 
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP);
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP);
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**publisherEndpoint);
+celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char 
*scope, const char *topic);
 
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* 
scope, char* topic);
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic);
+//note endpoint is owned by caller
+celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**subscriberEndpoint);
+celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char 
*scope, const char *topic);
 
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service);
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service);
+celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t 
*endpoint);
+celix_status_t pubsubAdmin_removeEndpoint(void *handle, const 
celix_properties_t *endpoint);
 
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
 
 #endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c 
b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
index 35da907..07f4466 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -113,7 +113,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const 
char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const 
char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -279,7 +279,7 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 
        /* Let's register the new service */
 
-       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+       celix_properties_t *pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 
        if(pubEP!=NULL){
                service_factory_pt factory = calloc(1, sizeof(*factory));
@@ -315,7 +315,7 @@ celix_status_t 
pubsub_topicPublicationStop(topic_publication_pt pub){
        return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
pubsub_endpoint_pt ep) {
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep) {
 
        celixThreadMutex_lock(&(pub->tp_lock));
        pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, 
PSA_ZMQ_PUBSUB_ADMIN_TYPE);
@@ -327,11 +327,11 @@ celix_status_t 
pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, p
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,celix_properties_t *ep){
 
        celixThreadMutex_lock(&(pub->tp_lock));
        for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
-               pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
+               celix_properties_t *e = arrayList_get(pub->pub_ep_list, i);
                if(pubsubEndpoint_equals(ep, e)) {
                    arrayList_removeElement(pub->pub_ep_list,ep);
                    break;
@@ -588,7 +588,7 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
 
                arrayList_create(&bound->mp_parts);
 
-               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+               celix_properties_t *pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
                bound->topic=strdup(properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
 
                bound->service.handle = bound;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h 
b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
index 20e4a8e..0fab6d7 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@ -35,11 +35,11 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t 
*best_serializer, const char *serType, char* bindIP, unsigned int basePort, 
unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
celix_properties_t *endpoint, pubsub_serializer_service_t *best_serializer, 
const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub, celix_properties_t *ep);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h 
b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
index 6dca4e5..03dfd9e6a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
@@ -49,8 +49,8 @@ celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
 
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
celix_properties_t *subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, celix_properties_t *subEP);
 
 array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c 
b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index fcf0823..98d6be6 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -79,6 +79,7 @@ celix_status_t pubsub_discovery_create(bundle_context_pt 
context, pubsub_discove
     (*ps_discovery)->ttlForEntries = (int)ttl;
     (*ps_discovery)->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0);
     (*ps_discovery)->pubsubPath = celix_bundleContext_getProperty(context, 
PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT);
+    (*ps_discovery)->fwUUID = celix_bundleContext_getProperty(context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
 
     return status;
 }
@@ -111,39 +112,58 @@ celix_status_t 
pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) {
     return status;
 }
 
-void* psd_watch(void *data) {
-    pubsub_discovery_t *disc = data;
 
-    celixThreadMutex_lock(&disc->runningMutex);
-    bool running = disc->running;
-    celixThreadMutex_unlock(&disc->runningMutex);
+static void psd_etcdReadCallback(const char *key __attribute__((unused)), 
const char *value, void* arg) {
+    pubsub_discovery_t *disc = arg;
+    celix_properties_t *props = pubsub_discovery_parseEndpoint(value);
+    if (props != NULL) {
+        pubsub_discovery_addDiscoveredEndpoint(disc, props);
+    }
+}
 
-    long long prevIndex = 0L;
+static void psd_watchSetupConnection(pubsub_discovery_t *disc, bool 
*connectedPtr, long long *mIndex) {
+    bool connected = *connectedPtr;
+    if (!connected) {
+        if (disc->verbose) {
+            printf("[PSD] Reading etcd directory at %s\n", disc->pubsubPath);
+        }
+        int rc = etcd_get_directory(disc->pubsubPath, psd_etcdReadCallback, 
disc, mIndex);
+        if (rc == ETCDLIB_RC_OK) {
+            *connectedPtr = true;
+        } else {
+            *connectedPtr = false;
+        }
+    }
+}
+
+static void psd_watchForChange(pubsub_discovery_t *disc, bool *connectedPtr, 
long long *mIndex) {
+    bool connected = *connectedPtr;
+    if (connected) {
+        long long watchIndex = *mIndex + 1;
 
-    while (running) {
         char *action = NULL;
         char *value = NULL;
         char *readKey = NULL;
-        long long mIndex;
         //TODO add interruptable etcd_wait -> which returns a handle to 
interrupt and a can be used for a wait call
-        int rc = etcd_watch(disc->pubsubPath, prevIndex, &action, NULL, 
&value, &readKey, &mIndex);
+        int rc = etcd_watch(disc->pubsubPath, watchIndex, &action, NULL, 
&value, &readKey, mIndex);
         if (rc == ETCDLIB_RC_TIMEOUT) {
             //nop
-        } else if (rc == ETCDLIB_RC_ERROR) {
-            printf("WARNING PSD: Error communicating with etcd.\n");
+        } else if (rc == ETCDLIB_RC_ERROR || action == NULL) {
+            printf("[PSD] Error communicating with etcd. rc is %i, action 
value is %s\n", rc, action);
+            *connectedPtr = false;
         } else {
             if (strncmp(ETCDLIB_ACTION_CREATE, action, 
strlen(ETCDLIB_ACTION_CREATE)) == 0 ||
-                strncmp(ETCDLIB_ACTION_SET, action, 
strlen(ETCDLIB_ACTION_SET)) == 0 ||
-                strncmp(ETCDLIB_ACTION_UPDATE, action, 
strlen(ETCDLIB_ACTION_UPDATE)) == 0) {
+                       strncmp(ETCDLIB_ACTION_SET, action, 
strlen(ETCDLIB_ACTION_SET)) == 0 ||
+                       strncmp(ETCDLIB_ACTION_UPDATE, action, 
strlen(ETCDLIB_ACTION_UPDATE)) == 0) {
                 celix_properties_t *props = 
pubsub_discovery_parseEndpoint(value);
                 if (props != NULL) {
                     pubsub_discovery_addDiscoveredEndpoint(disc, props);
                 }
             } else if (strncmp(ETCDLIB_ACTION_DELETE, action, 
strlen(ETCDLIB_ACTION_DELETE)) == 0 ||
-                        strncmp(ETCDLIB_ACTION_EXPIRE, action, 
strlen(ETCDLIB_ACTION_EXPIRE)) == 0) {
-                celix_properties_t *props = 
pubsub_discovery_parseEndpoint(value);
-                if (props != NULL) {
-                    const char *uuid = celix_properties_get(props, 
PUBSUB_ENDPOINT_UUID, NULL);
+                       strncmp(ETCDLIB_ACTION_EXPIRE, action, 
strlen(ETCDLIB_ACTION_EXPIRE)) == 0) {
+                char *uuid = strrchr(readKey, '/');
+                if (uuid != NULL) {
+                    uuid = uuid + 1;
                     pubsub_discovery_removeDiscoveredEndpoint(disc, uuid);
                 }
             } else {
@@ -153,9 +173,61 @@ void* psd_watch(void *data) {
             free(action);
             free(value);
             free(readKey);
-            prevIndex = mIndex;
         }
+    } else {
+        if (disc->verbose) {
+            printf("[PSD] Skipping etcd watch -> not connected\n");
+        }
+    }
+}
+
+static void psd_cleanupIfDisconnected(pubsub_discovery_t *disc, bool 
*connectedPtr) {
+    bool connected = *connectedPtr;
+    if (!connected) {
+
+        celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
+        int size = hashMap_size(disc->discoveredEndpoints);
+        if (disc->verbose) {
+            printf("[PSD] Removing all discovered entries (%i) -> not 
connected\n", size);
+        }
+
+        hash_map_iterator_t iter = 
hashMapIterator_construct(disc->discoveredEndpoints);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+
+            celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
+            hash_map_iterator_t iter2 = 
hashMapIterator_construct(disc->discoveredEndpointsListeners);
+            while (hashMapIterator_hasNext(&iter2)) {
+                pubsub_discovered_endpoint_listener_t *listener = 
hashMapIterator_nextValue(&iter2);
+                listener->removeDiscoveredEndpoint(listener->handle, endpoint);
+            }
+            celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
+
+            celix_properties_destroy(endpoint);
+        }
+        hashMap_clear(disc->discoveredEndpoints, false, false);
+        celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
+    }
+}
+
+void* psd_watch(void *data) {
+    pubsub_discovery_t *disc = data;
+
+    long long mIndex = 0L;
+    bool connected = false;
 
+    celixThreadMutex_lock(&disc->runningMutex);
+    bool running = disc->running;
+    celixThreadMutex_unlock(&disc->runningMutex);
+
+    while (running) {
+        psd_watchSetupConnection(disc, &connected, &mIndex);
+        psd_watchForChange(disc, &connected, &mIndex);
+        psd_cleanupIfDisconnected(disc, &connected);
+
+        if (!connected) {
+            usleep(5000000); //if not connected wait a few seconds
+        }
 
         celixThreadMutex_lock(&disc->runningMutex);
         running = disc->running;
@@ -173,8 +245,8 @@ void* psd_refresh(void *data) {
     celixThreadMutex_unlock(&disc->runningMutex);
 
     while (running) {
-        struct timeval start;
-        gettimeofday(&start, NULL);
+        struct timespec start;
+        clock_gettime(CLOCK_REALTIME, &start);
 
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
         hash_map_iterator_t iter = 
hashMapIterator_construct(disc->announcedEndpoints);
@@ -199,21 +271,11 @@ void* psd_refresh(void *data) {
         }
         celixThreadMutex_unlock(&disc->announcedEndpointsMutex);
 
-        struct timeval end;
-        gettimeofday(&end, NULL);
-
-        double s = start.tv_sec + (start.tv_usec / 1000.0 / 1000.0 );
-        double e = end.tv_sec + (end.tv_usec / 1000.0 / 1000.0 );
-        double elapsedInsec = e - s;
-        double sleepNeededInSec = disc->sleepInsecBetweenTTLRefresh - 
elapsedInsec;
-        if (sleepNeededInSec > 0) {
-            celixThreadMutex_lock(&disc->waitMutex);
-            double waitTill = sleepNeededInSec + end.tv_sec + (end.tv_usec / 
1000.0 / 1000.0);
-            long sec = (long)waitTill;
-            long nsec = (long)((waitTill - sec) * 1000 * 1000 * 1000);
-            celixThreadCondition_timedwait(&disc->waitCond, &disc->waitMutex, 
sec, nsec);
-            celixThreadMutex_unlock(&disc->waitMutex);
-        }
+        struct timespec waitTill = start;
+        waitTill.tv_sec += disc->sleepInsecBetweenTTLRefresh;
+        celixThreadMutex_lock(&disc->waitMutex);
+        pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); 
//TODO add timedwait abs for celixThread
+        celixThreadMutex_unlock(&disc->waitMutex);
 
         celixThreadMutex_lock(&disc->runningMutex);
         running = disc->running;
@@ -226,7 +288,10 @@ celix_status_t pubsub_discovery_start(pubsub_discovery_t 
*ps_discovery) {
     celix_status_t status = CELIX_SUCCESS;
 
     celixThread_create(&ps_discovery->watchThread, NULL, psd_watch, 
ps_discovery);
+    celixThread_setName(&ps_discovery->watchThread, "PubSub ETCD Watch");
     celixThread_create(&ps_discovery->refreshTTLThread, NULL, psd_refresh, 
ps_discovery);
+    celixThread_setName(&ps_discovery->refreshTTLThread, "PubSub ETCD Refresh 
TTL");
+
     return status;
 }
 
@@ -244,7 +309,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_t 
*disc) {
     celixThread_join(disc->watchThread, NULL);
     celixThread_join(disc->refreshTTLThread, NULL);
 
-    //TODO NOTE double lock , check if this is always done in the same order
     celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
     hash_map_iterator_t iter = 
hashMapIterator_construct(disc->discoveredEndpoints);
     while (hashMapIterator_hasNext(&iter)) {
@@ -367,7 +431,27 @@ celix_status_t pubsub_discovery_removeEndpoint(void 
*handle, const celix_propert
 
 static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, 
celix_properties_t *endpoint) {
     const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, 
NULL);
-    assert(uuid != NULL); //note endpoint should already be check to be valid 
pubsubEndpoint_isValid
+    const char *fwUUID = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
+
+    //note endpoint should already be check to be valid pubsubEndpoint_isValid
+    assert(uuid != NULL);
+    assert(fwUUID != NULL);
+
+    if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) 
== 0) {
+        if (disc->verbose) {
+            printf("[PSD] Ignoring endpoint %s from own framework\n", uuid);
+        }
+        return;
+    }
+
+    if (disc->verbose) {
+        const char *uuid = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *type = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TYPE, "!Error!");
+        const char *admin = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *ser = celix_properties_get(endpoint, 
PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+        printf("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, 
serializer is %s.\n",
+               uuid, type, admin, ser);
+    }
 
     celixThreadMutex_lock(&disc->discoveredEndpointsMutex);
     bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid);
@@ -383,7 +467,7 @@ static void 
pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
         }
         celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex);
     } else {
-        fprintf(stderr, "[PSD] Warning unexpected update from an already 
existing endpoint (uuid is %s)\n", uuid);
+        //assuming this is the same endpoint -> ignore
     }
 }
 
@@ -393,6 +477,20 @@ static void 
pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
     celix_properties_t *endpoint = hashMap_remove(disc->discoveredEndpoints, 
(void*)uuid);
     celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
 
+    if (endpoint == NULL) {
+        //NOTE assuming this was a endpoint from this framework -> ignore
+        return;
+    }
+
+    if (disc->verbose) {
+        const char *uuid = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_UUID, "!Error!");
+        const char *type = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TYPE, "!Error!");
+        const char *admin = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+        const char *ser = celix_properties_get(endpoint, 
PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
+        printf("[PSD] Removing discovered endpoint %s. type is %s, admin is 
%s, serializer is %s.\n",
+               uuid, type, admin, ser);
+    }
+
     if (exists && endpoint != NULL) {
         celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
         hash_map_iterator_t iter = 
hashMapIterator_construct(disc->discoveredEndpointsListeners);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h 
b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index 11663cb..f0a5b22 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -40,20 +40,21 @@
 
 #define PUBSUB_DISCOVERY_SERVER_IP_DEFAULT             "127.0.0.1"
 #define PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT   2379
-#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT   "pubsub/discovery"
+#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT   "pubsub/"
 #define PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT       30
 
 typedef struct pubsub_discovery {
        bundle_context_pt context;
+       //TODO add logHelper
 
-       celix_thread_mutex_t discoveredEndpointsMutex;
+       celix_thread_mutex_t discoveredEndpointsMutex; //when locked with 
EndpointsListenersMutex -> first lock this
        hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t 
/*endpoint*/>>
 
        celix_thread_mutex_t announcedEndpointsMutex;
        hash_map_pt announcedEndpoints; //<key = char* (etcd 
key),pubsub_announce_entry_t /*endpoint*/>>
 
        celix_thread_mutex_t discoveredEndpointsListenersMutex;
-       hash_map_pt discoveredEndpointsListeners; //key=svcId, 
value=pubsub_discover_listener_t
+       hash_map_pt discoveredEndpointsListeners; //key=svcId, 
value=pubsub_discovered_endpoint_listener_t
 
        celix_thread_mutex_t waitMutex;
        celix_thread_cond_t  waitCond;
@@ -69,6 +70,7 @@ typedef struct pubsub_discovery {
        bool verbose;
        int ttlForEntries;
        int sleepInsecBetweenTTLRefresh;
+       const char *fwUUID;
 } pubsub_discovery_t;
 
 typedef struct pubsub_announce_entry {
@@ -83,15 +85,10 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t 
*node_discovery);
 celix_status_t pubsub_discovery_start(pubsub_discovery_t *node_discovery);
 celix_status_t pubsub_discovery_stop(pubsub_discovery_t *node_discovery);
 
-celix_status_t pubsub_discovery_addNode(pubsub_discovery_t *node_discovery, 
pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_discovery_removeNode(pubsub_discovery_t *node_discovery, 
pubsub_endpoint_pt pubEP);
-
 void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 
 celix_status_t pubsub_discovery_announceEndpoint(void *handle, const 
celix_properties_t *endpoint);
 celix_status_t pubsub_discovery_removeEndpoint(void *handle, const 
celix_properties_t *endpoint);
 
-celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_t 
*discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
-
 #endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt 
b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
index ce56037..010e864 100644
--- a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_serializer_json
     VERSION "1.0.0"
        GROUP "Celix/PubSub"
     SOURCES
-               src/ps_activator.c
+        src/ps_json_serializer_activator.c
                src/pubsub_serializer_impl.c
 )
 target_include_directories(celix_pubsub_serializer_json PRIVATE

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c 
b/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
deleted file mode 100644
index 32dd1fc..0000000
--- a/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ps_activator.c
- *
- *  \date       Mar 24, 2017
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-#include "pubsub_constants.h"
-
-#include "pubsub_serializer_impl.h"
-
-struct activator {
-       pubsub_serializer_t* serializer;
-       pubsub_serializer_service_t* serializerService;
-       service_registration_pt registration;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator;
-
-       activator = calloc(1, sizeof(*activator));
-       if (!activator) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-               *userData = activator;
-               status = pubsubSerializer_create(context, 
&(activator->serializer));
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-       pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, 
sizeof(*pubsubSerializerSvc));
-
-       if (!pubsubSerializerSvc) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-               pubsubSerializerSvc->handle = activator->serializer;
-
-               pubsubSerializerSvc->createSerializerMap = 
(void*)pubsubSerializer_createSerializerMap;
-               pubsubSerializerSvc->destroySerializerMap = 
(void*)pubsubSerializer_destroySerializerMap;
-               activator->serializerService = pubsubSerializerSvc;
-
-               /* Set serializer type */
-               properties_pt props = properties_create();
-               properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, 
PUBSUB_SERIALIZER_TYPE);
-
-               status = bundleContext_registerService(context, 
PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, 
&activator->registration);
-
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       serviceRegistration_unregister(activator->registration);
-       activator->registration = NULL;
-
-       free(activator->serializerService);
-       activator->serializerService = NULL;
-
-       return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       pubsubSerializer_destroy(activator->serializer);
-       activator->serializer = NULL;
-
-       free(activator);
-
-       return status;
-}
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c 
b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
new file mode 100644
index 0000000..b74bab8
--- /dev/null
+++ b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c
@@ -0,0 +1,59 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdlib.h>
+#include <pubsub_constants.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer_impl.h"
+
+typedef struct psjs_activator {
+       pubsub_serializer_t* serializer;
+       ;
+       pubsub_serializer_service_t serializerSvc;
+       long serializerSvcId;
+} psjs_activator_t;
+
+static int psjs_start(psjs_activator_t *act, celix_bundle_context_t *ctx) {
+       act->serializerSvcId = -1L;
+
+       celix_status_t status = pubsubSerializer_create(ctx, 
&(act->serializer));
+       if (status == CELIX_SUCCESS) {
+               act->serializerSvc.handle = act->serializer;
+
+               act->serializerSvc.createSerializerMap = 
(void*)pubsubSerializer_createSerializerMap;
+               act->serializerSvc.destroySerializerMap = 
(void*)pubsubSerializer_destroySerializerMap;
+
+               /* Set serializer type */
+               celix_properties_t *props = celix_properties_create();
+               celix_properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, 
PUBSUB_JSON_SERIALIZER_TYPE);
+
+               act->serializerSvcId = celix_bundleContext_registerService(ctx, 
&act->serializerSvc, PUBSUB_SERIALIZER_SERVICE_NAME, props);
+       }
+       return status;
+}
+
+static int psjs_stop(psjs_activator_t *act, celix_bundle_context_t *ctx) {
+       celix_bundleContext_unregisterService(ctx, act->serializerSvcId);
+       act->serializerSvcId = -1L;
+       pubsubSerializer_destroy(act->serializer);
+       return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psjs_activator_t, psjs_start, psjs_stop)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h 
b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index c36f20e..75b72cb 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -16,13 +16,6 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_serializer_impl.h
- *
- *  \date       Mar 24, 2017
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_SERIALIZER_JSON_H_
 #define PUBSUB_SERIALIZER_JSON_H_
@@ -34,7 +27,7 @@
 
 #include "pubsub_serializer.h"
 
-#define PUBSUB_SERIALIZER_TYPE "json"
+#define PUBSUB_JSON_SERIALIZER_TYPE    "json"
 
 typedef struct pubsub_serializer {
        bundle_context_pt bundle_context;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt 
b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index c565660..7c27ed6 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -16,7 +16,7 @@
 # under the License.
 
 add_library(pubsub_spi STATIC
-               src/pubsub_admin_match.c
+        src/pubsub_utils_match.c
         src/pubsub_endpoint.c
         src/pubsub_utils.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 5379415..8c15fcf 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -27,44 +27,41 @@
 #ifndef PUBSUB_ADMIN_H_
 #define PUBSUB_ADMIN_H_
 
-#include "service_reference.h"
+#include "celix_properties.h"
+#include "celix_bundle.h"
+#include "celix_filter.h"
 
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
+#define PUBSUB_ADMIN_SERVICE_NAME              "pubsub_admin"
+#define PUBSUB_ADMIN_SERVICE_VERSION   "2.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE             "[2,3)"
 
-#include "pubsub_constants.h"
+//expected service properties
+#define PUBSUB_ADMIN_SERVICE_TYPE              "psa_type"
 
-
-typedef struct pubsub_admin *pubsub_admin_pt;
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE  100.0F
+#define PUBSUB_ADMIN_NO_MATCH_SCORE        0.0F
 
 struct pubsub_admin_service {
-       pubsub_admin_pt admin;
-
-       celix_status_t (*addSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-       celix_status_t (*removeSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-
-       celix_status_t (*addPublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-       celix_status_t (*removePublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+       void *handle;
 
-       celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* 
scope, char* topic);
-       celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* 
scope, char* topic);
+       celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, 
const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
+       celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, 
const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+       celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t 
*endpoint, double *score);
 
-       //TODO add match function for subscription service and publication 
listeners, e.g.:
-       //matchPublisherListener(admin, bundle, filter, outScore)
-       //matchSubscriberService(admin, svcRef, outScore)
+       //note endpoint is owned by caller
+       celix_status_t (*setupTopicSender)(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**publisherEndpoint);
+       celix_status_t (*teardownTopicSender)(void *handle, const char *scope, 
const char *topic);
 
-       /* Match principle:
-        * - A full matching pubsub_admin gives 100 points
-        */
-       //TODO this should only be called for remote endpoints (e.g. not 
endpoints from this framework
-       celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
+       //note endpoint is owned by caller
+       celix_status_t (*setupTopicReciever)(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**subscriberEndpoint);
+       celix_status_t (*teardownTopicReciever)(void *handle, const char 
*scope, const char *topic);
 
-        //TODO redesign add function for handling endpoint seperate, e.g.: 
-        //addEndpoint(admin, endpoint); 
-        //note that endpoints can be subscribers and publishers
-        //Also note that we than can have pending subscribers and pending 
(subscriber/publisher) endpoints.
+       celix_status_t (*addEndpoint)(void *handle, const celix_properties_t 
*endpoint);
+       celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t 
*endpoint);
 };
 
-typedef struct pubsub_admin_service *pubsub_admin_service_pt;
+typedef struct pubsub_admin_service pubsub_admin_service_t;
 
 #endif /* PUBSUB_ADMIN_H_ */
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
deleted file mode 100644
index 08d6582..0000000
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-
-#ifndef PUBSUB_ADMIN_MATCH_H_
-#define PUBSUB_ADMIN_MATCH_H_
-
-#include "celix_errno.h"
-#include "properties.h"
-#include "array_list.h"
-
-#include "pubsub_serializer.h"
-
-#define QOS_ATTRIBUTE_KEY      "attribute.qos"
-#define QOS_TYPE_SAMPLE                "sample"        /* A.k.a. unreliable 
connection */
-#define QOS_TYPE_CONTROL       "control"       /* A.k.a. reliable connection */
-
-#define PUBSUB_ADMIN_FULL_MATCH_SCORE  100.0F
-
-celix_status_t pubsub_admin_match(
-        pubsub_endpoint_pt endpoint,
-        const char *pubsub_admin_type,
-        const char *frameworkUuid,
-        double sampleScore,
-        double controlScore,
-        double defaultScore,
-        array_list_pt serializerList,
-        double *score);
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, service_reference_pt *out);
-
-#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
index cfed5d9..e551031 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h
@@ -27,13 +27,9 @@
 #ifndef PUBSUB_COMMON_H_
 #define PUBSUB_COMMON_H_
 
-#define PUBSUB_SERIALIZER_SERVICE                                      
"pubsub_serializer"
-#define PUBSUB_ADMIN_SERVICE                                           
"pubsub_admin"
 #define PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE      
"pubsub_announce_endpoint_listener"
 #define PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE    
"pubsub_discovered_endpoint_listener"
 
-#define PUBSUB_ANY_SUB_TOPIC                                   "any"
-
 #define MAX_SCOPE_LEN                                  1024
 #define MAX_TOPIC_LEN                                                          
1024
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 47e31d3..be47318 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -20,10 +20,6 @@
 #ifndef PUBSUB_CONSTANTS_H_
 #define PUBSUB_CONSTANTS_H_
 
-#define PSA_IP         "PSA_IP"
-#define PSA_ITF        "PSA_INTERFACE"
-#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
-
 #define PUBSUB_ADMIN_TYPE_KEY     "pubsub.config"
 #define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type"
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 0d8c513..b03109d 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -43,33 +43,11 @@
 #define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
 
 
-struct pubsub_endpoint {
-    const char *topicName;
-    const char *topicScope;
+celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* 
scope, const char* topic, const char* pubsubType, const char* adminType, const 
char *serType, celix_properties_t *topic_props);
+celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* 
ctx, long svcBndId, const celix_properties_t *svcProps);
+celix_properties_t* 
pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context_t *ctx, long 
bundleId, const char *filter);
 
-    const char *uuid;
-    const char *frameworkUUid;
-    const char *type;
-    const char *adminType;
-    const char *serializerType;
-
-    celix_properties_t *properties;
-};
-
-typedef struct pubsub_endpoint *pubsub_endpoint_pt;
-typedef struct pubsub_endpoint pubsub_endpoint_t;
-
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, const char* pubsubType, const char* adminType, const char 
*serType, celix_properties_t *topic_props, pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromProperties(const celix_properties_t 
*props, pubsub_endpoint_t **out);
-celix_status_t pubsubEndpoint_createFromSvc(bundle_context_t* ctx, const 
celix_bundle_t *bnd, const celix_properties_t *svcProps, bool isPublisher, 
pubsub_endpoint_pt* out);
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t 
*ctx, const celix_service_tracker_info_t *info, bool isPublisher, 
pubsub_endpoint_pt* out);
-celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
-
-void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
-bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
-bool pubsubEndpoint_equalsWithProperties(pubsub_endpoint_pt psEp1, const 
celix_properties_t *props);
-
-void pubsubEndpoint_setField(pubsub_endpoint_t *ep, const char *key, const 
char *val);
+bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const 
celix_properties_t *psEp2);
 
 //check if the required properties are available for the endpoint
 bool pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool 
requireAdminType, bool requireSerializerType);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
index a91e820..9fa3340 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -16,21 +16,13 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_serializer.h
- *
- *  \date       Mar 24, 2017
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_SERIALIZER_SERVICE_H_
 #define PUBSUB_SERIALIZER_SERVICE_H_
 
-#include "service_reference.h"
 #include "hash_map.h"
-
-#include "pubsub_common.h"
+#include "version.h"
+#include "celix_bundle.h"
 
 /**
  * There should be a pubsub_serializer_t
@@ -41,6 +33,10 @@
  * the extender pattern.
  */
 
+#define PUBSUB_SERIALIZER_SERVICE_NAME         "pubsub_serializer"
+#define PUBSUB_SERIALIZER_SERVICE_VERSION      "1.0.0"
+#define PUBSUB_SERIALIZER_SERVICE_RANGE                "[1,2)"
+
 typedef struct pubsub_msg_serializer {
        void* handle;
        unsigned int msgId;
@@ -56,7 +52,7 @@ typedef struct pubsub_msg_serializer {
 typedef struct pubsub_serializer_service {
        void* handle;
 
-       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
hash_map_pt* serializerMap);
+       celix_status_t (*createSerializerMap)(void* handle, celix_bundle_t 
*bundle, hash_map_pt* serializerMap);
        celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt 
serializerMap);
 
 } pubsub_serializer_service_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h 
b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 1c92a9b..14f9bb0 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -16,19 +16,18 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_utils.h
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
 
 #ifndef PUBSUB_UTILS_H_
 #define PUBSUB_UTILS_H_
 
 #include "bundle_context.h"
-#include "array_list.h"
+#include "celix_array_list.h"
+#include "celix_bundle_context.h"
+
+#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY     "qos"
+#define PUBSUB_UTILS_QOS_TYPE_SAMPLE           "sample"        /* A.k.a. 
unreliable connection */
+#define PUBSUB_UTILS_QOS_TYPE_CONTROL      "control"   /* A.k.a. reliable 
connection */
+
 
 /**
  * Returns the pubsub info from the provided filter. A pubsub filter should 
have a topic and can 
@@ -41,5 +40,37 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* 
filterstr, char **topi
 
 char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
 
+double pubsub_utils_matchPublisher(
+        celix_bundle_context_t *ctx,
+        long bundleId,
+        const char *filter,
+        const char *adminType,
+        double sampleScore,
+        double controlScore,
+        double defaultScore,
+        long *outSerializerSvcId);
+
+double pubsub_utils_matchSubscriber(
+        celix_bundle_context_t *ctx,
+        long svcProviderBundleId,
+        const celix_properties_t *svcProperties,
+        const char *adminType,
+        double sampleScore,
+        double controlScore,
+        double defaultScore,
+        long *outSerializerSvcId);
+
+double pubsub_utils_matchEndpoint(
+        celix_bundle_context_t *ctx,
+        const celix_properties_t *endpoint,
+        const char *adminType,
+        double sampleScore,
+        double controlScore,
+        double defaultScore,
+        long *outSerializerSvcId);
+
+
+celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t 
*bundle, const char *topic, bool isPublisher);
+
 
 #endif /* PUBSUB_UTILS_H_ */

Reply via email to