Repository: celix
Updated Branches:
  refs/heads/develop a8b8410f1 -> 0a5ef69a6


Changing pubsub endpoint to store more information in etcd


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/9094f554
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/9094f554
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/9094f554

Branch: refs/heads/develop
Commit: 9094f554217e963797910306359c89b145518724
Parents: d7307ff
Author: Roy Lenferink <rlenfer...@apache.org>
Authored: Wed Jan 31 17:26:11 2018 +0100
Committer: Roy Lenferink <rlenfer...@apache.org>
Committed: Fri Feb 2 11:48:19 2018 +0100

----------------------------------------------------------------------
 .../private/src/pubsub_admin_impl.c             | 103 ++++++++++++-------
 .../private/src/topic_publication.c             |  15 +--
 .../private/src/pubsub_admin_impl.c             |  99 +++++++++++-------
 .../private/src/topic_publication.c             |  13 +--
 .../public/include/pubsub_endpoint.h            |  19 ++--
 .../pubsub_common/public/src/pubsub_endpoint.c  |  88 ++++++++++------
 .../pubsub_discovery/private/src/etcd_watcher.c |  56 +++++++++-
 .../pubsub_discovery/private/src/etcd_writer.c  |  44 ++++++--
 .../private/src/pubsub_discovery_impl.c         |  19 ++--
 .../private/src/pubsub_topology_manager.c       |  40 ++++---
 utils/private/src/properties.c                  |   5 +
 utils/public/include/properties.h               |   2 +
 12 files changed, 343 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 1fbdb08..8124950 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -311,7 +311,9 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                        status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
                }
                else{
-                       printf("PSA_UDP_MC: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                       printf("PSA_UDP_MC: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",
+                                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+
                        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                        arrayList_add(admin->noSerializerSubscriptions,subEP);
                        
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -330,8 +332,8 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                                if(topic_publishers!=NULL){
                                        
for(i=0;i<arrayList_size(topic_publishers);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                        arrayList_destroy(topic_publishers);
@@ -348,8 +350,8 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                                if(ext_pub_list!=NULL){
                                        
for(i=0;i<arrayList_size(ext_pub_list);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                }
@@ -379,9 +381,13 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
+       printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
+                  properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  subEP->serviceID,
+                  properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
+       if(strcmp(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
                return pubsubAdmin_addAnySubscription(admin,subEP);
        }
 
@@ -391,7 +397,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
        celixThreadMutex_lock(&admin->localPublicationsLock);
        celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-       char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
        array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -406,10 +412,12 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
                if(subscription == NULL) {
                        pubsub_serializer_service_t *best_serializer = NULL;
                        if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
-                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, 
subEP->scope, subEP->topic, best_serializer, &subscription);
+                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, 
(char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, 
&subscription);
                        }
                        else{
-                               printf("PSA_UDP_MC: Cannot find a serializer 
for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for subscribing topic %s. Adding it to pending list.\n",
+                                          
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                                
arrayList_add(admin->noSerializerSubscriptions,subEP);
                                
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -425,8 +433,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                                        if(topic_publishers!=NULL){
                                                
for(i=0;i<arrayList_size(topic_publishers);i++){
                                                        pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                                       if(pubEP->endpoint 
!=NULL){
-                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                                       
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                        }
                                                }
                                                
arrayList_destroy(topic_publishers);
@@ -438,8 +446,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                                if(ext_pub_list!=NULL){
                                        
for(i=0;i<arrayList_size(ext_pub_list);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                }
@@ -476,9 +484,13 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
+       printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
+                  properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  subEP->serviceID,
+                  properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -510,7 +522,11 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
+       printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  pubEP->serviceID,
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        const char* fwUUID = NULL;
 
@@ -519,9 +535,9 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_
                printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
-       char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
+       if ((strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -534,7 +550,9 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_
                                status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, 
admin->mcIpAddress, &pub);
                        }
                        else{
-                               printf("PSA_UDP_MC: Cannot find a serializer 
for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for publishing topic %s. Adding it to pending list.\n",
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                                
arrayList_add(admin->noSerializerPublications,pubEP);
                                
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -547,7 +565,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_
                                        connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
                                }
                        } else {
-                               printf("PSA_UDP_MC: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
+                               printf("PSA_UDP_MC: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n",
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
+                                          pubEP->serviceID);
                        }
                } else {
                        //just add the new EP to the list
@@ -595,14 +616,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
        topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-       if (sub != NULL && pubEP->endpoint != NULL) {
-               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
+       if (sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-       if (any_sub != NULL && pubEP->endpoint != NULL) {
-               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
+       if (any_sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        free(scope_topic);
@@ -617,7 +638,11 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        celix_status_t status = CELIX_SUCCESS;
        int count = 0;
 
-       printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
+       printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  pubEP->serviceID,
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        const char* fwUUID = NULL;
 
@@ -626,9 +651,9 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+       if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
                service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -665,7 +690,7 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                        // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
                        for(i=0; i<arrayList_size(ext_pub_list);i++) {
                                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+                               if 
(strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 
0) {
                                        count++;
                                }
                        }
@@ -687,14 +712,14 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+       if(sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
+       if(any_sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        free(scope_topic);
@@ -793,7 +818,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* 
interface, char** ip)
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       char* scope_topic =createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic 
=createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
        if(pendingListPerTopic==NULL){
                arrayList_create(&pendingListPerTopic);
@@ -888,9 +913,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt
                                /* Remove the publication */
                                pubsubAdmin_removePublication(admin, pubEP);
                                /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(pubEP->endpoint!=NULL){
-                                       free(pubEP->endpoint);
-                                       pubEP->endpoint = NULL;
+                               if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                                       properties_unset(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
                                }
                                /* Add the orphan endpoint to the noSerializer 
pending list */
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -940,9 +964,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt
                                /* Remove the subscription */
                                pubsubAdmin_removeSubscription(admin, subEP);
                                /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(subEP->endpoint!=NULL){
-                                       free(subEP->endpoint);
-                                       subEP->endpoint = NULL;
+                               if(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                                       properties_unset(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
                                }
                                /* Add the orphan endpoint to the noSerializer 
pending list */
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index e43ec29..52abeda 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -174,14 +174,17 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
                factory->ungetService = pubsub_topicPublicationUngetService;
 
                properties_pt props = properties_create();
-               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
-               properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
+               
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_SCOPE));
+               
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_TOPIC));
 
                status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
                if(status != CELIX_SUCCESS){
                        properties_destroy(props);
-                       printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register 
ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, 
pubEP->topic,pubEP->serviceID);
+                       printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register 
ServiceFactory for topic %s, topic %s (bundle %ld).\n",
+                                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
+                                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
+                                  pubEP->serviceID);
                }
                else{
                        *svcFactory = factory;
@@ -202,7 +205,7 @@ celix_status_t 
pubsub_topicPublicationStop(topic_publication_pt pub){
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 
        celixThreadMutex_lock(&(pub->tp_lock));
-       ep->endpoint = strdup(pub->endpoint);
+       pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
        arrayList_add(pub->pub_ep_list,ep);
        celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -393,8 +396,8 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                }
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-               bound->scope=strdup(pubEP->scope);
-               bound->topic=strdup(pubEP->topic);
+               bound->scope=strdup(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE));
+               bound->topic=strdup(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
                bound->largeUdpHandle = largeUdp_create(1);
 
                bound->service.handle = bound;

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 29ead0c..9a486dd 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -312,7 +312,8 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                        status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
                }
                else{
-                       printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                       printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",
+                                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
                        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                        arrayList_add(admin->noSerializerSubscriptions,subEP);
                        
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -331,8 +332,8 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                                if(topic_publishers!=NULL){
                                        
for(i=0;i<arrayList_size(topic_publishers);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                        arrayList_destroy(topic_publishers);
@@ -349,8 +350,8 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                                if(ext_pub_list!=NULL){
                                        
for(i=0;i<arrayList_size(ext_pub_list);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                }
@@ -380,9 +381,13 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
+       printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
+                  properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  subEP->serviceID,
+                  properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
+       if(strcmp(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
                return pubsubAdmin_addAnySubscription(admin,subEP);
        }
 
@@ -392,7 +397,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
        celixThreadMutex_lock(&admin->localPublicationsLock);
        celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
        array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -407,10 +412,11 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
                if(subscription == NULL) {
                        pubsub_serializer_service_t *best_serializer = NULL;
                        if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
-                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, 
subEP->topic, best_serializer, &subscription);
+                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, 
&subscription);
                        }
                        else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                               printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",
+                                          
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                                
arrayList_add(admin->noSerializerSubscriptions,subEP);
                                
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -426,8 +432,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                                        if(topic_publishers!=NULL){
                                                
for(i=0;i<arrayList_size(topic_publishers);i++){
                                                        pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                                       if(pubEP->endpoint 
!=NULL){
-                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                                       
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
                                                        }
                                                }
                                                
arrayList_destroy(topic_publishers);
@@ -439,8 +445,8 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                                if(ext_pub_list!=NULL){
                                        
for(i=0;i<arrayList_size(ext_pub_list);i++){
                                                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                               
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
                                                }
                                        }
                                }
@@ -477,9 +483,12 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
+       printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",
+                  properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  subEP->serviceID,
+                  properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -511,7 +520,11 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP) {
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, 
pubEP->topic);
+       printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  pubEP->serviceID,
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        const char* fwUUID = NULL;
 
@@ -521,9 +534,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin, pubsub_endpoint
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
 
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
+       if ((strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) &&
+                       (properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) == NULL)) {
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -536,7 +550,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin, pubsub_endpoint
                                status = 
pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, 
admin->ipAddress, admin->basePort, admin->maxPort, &pub);
                        }
                        else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+                               printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n",
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
                                
arrayList_add(admin->noSerializerPublications,pubEP);
                                
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -549,7 +564,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin, pubsub_endpoint
                                        connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
                                }
                        } else {
-                               printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
+                               printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n",
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
+                                          
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
+                                          pubEP->serviceID);
                        }
                } else {
                        //just add the new EP to the list
@@ -597,14 +615,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin, pubsub_endpoint
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
        topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-       if (sub != NULL && pubEP->endpoint != NULL) {
-               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
+       if (sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-       if (any_sub != NULL && pubEP->endpoint != NULL) {
-               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
+       if (any_sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        }
 
        free(scope_topic);
@@ -619,7 +637,10 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        celix_status_t status = CELIX_SUCCESS;
        int count = 0;
 
-       printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
+       printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  pubEP->serviceID,
+                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        const char* fwUUID = NULL;
 
@@ -628,9 +649,9 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
-       if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+       if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
                service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -666,7 +687,7 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                        // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
                        for(i=0; i<arrayList_size(ext_pub_list);i++) {
                                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+                               if 
(strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 
0) {
                                        count++;
                                }
                        }
@@ -688,14 +709,14 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+       if(sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
        }
 
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
+       if(any_sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
        }
 
        free(scope_topic);
@@ -793,7 +814,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* 
interface, char** ip)
 
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
        if(pendingListPerTopic==NULL){
                arrayList_create(&pendingListPerTopic);
@@ -887,9 +908,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt
                                /* Remove the publication */
                                pubsubAdmin_removePublication(admin, pubEP);
                                /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(pubEP->endpoint!=NULL){
-                                       free(pubEP->endpoint);
-                                       pubEP->endpoint = NULL;
+                               if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                                       properties_unset(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
                                }
                                /* Add the orphan endpoint to the noSerializer 
pending list */
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
@@ -939,9 +959,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt
                                /* Remove the subscription */
                                pubsubAdmin_removeSubscription(admin, subEP);
                                /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(subEP->endpoint!=NULL){
-                                       free(subEP->endpoint);
-                                       subEP->endpoint = NULL;
+                               if(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                                       properties_unset(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
                                }
                                /* Add the orphan endpoint to the noSerializer 
pending list */
                                
celixThreadMutex_lock(&admin->noSerializerPendingsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 34c25b3..105d540 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -198,7 +198,7 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
         memset(bindAddress, 0, EP_ADDRESS_LEN);
 
                snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); 
//NOTE using a different bind addres than endpoint address
+        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); 
//NOTE using a different bind address than endpoint address
                rv = zsock_bind (socket, "%s", bindAddress);
         if (rv == -1) {
             perror("Error for zmq_bind");
@@ -289,15 +289,16 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
                factory->ungetService = pubsub_topicPublicationUngetService;
 
                properties_pt props = properties_create();
-               properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
-               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+               
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_TOPIC));
+               
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_SCOPE));
                properties_set(props,"service.version", 
PUBSUB_PUBLISHER_SERVICE_VERSION);
 
                status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
                if(status != CELIX_SUCCESS){
                        properties_destroy(props);
-                       printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
+                       printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",
+                                  properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID);
                }
                else{
                        *svcFactory = factory;
@@ -318,7 +319,7 @@ celix_status_t 
pubsub_topicPublicationStop(topic_publication_pt pub){
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 
        celixThreadMutex_lock(&(pub->tp_lock));
-       ep->endpoint = strdup(pub->endpoint);
+    pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
        arrayList_add(pub->pub_ep_list,ep);
        celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -587,7 +588,7 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                arrayList_create(&bound->mp_parts);
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-               bound->topic=strdup(pubEP->topic);
+               bound->topic=strdup(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
                bound->service.handle = bound;
                bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_common/public/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h 
b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
index 8a979eb..0a0b727 100644
--- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
@@ -34,13 +34,19 @@
 #include "publisher.h"
 #include "subscriber.h"
 
+#define PUBSUB_ENDPOINT_ID              "pubsub.endpoint.id"
+#define PUBSUB_ENDPOINT_SERVICE_ID      "service.id"
+#define PUBSUB_ENDPOINT_SERIALIZER      "serializer"
+#define PUBSUB_ENDPOINT_ADMIN_TYPE      "pubsub.admin.type"
+#define PUBSUB_ENDPOINT_URL             "pubsub.endpoint"
+#define PUBSUB_ENDPOINT_TOPIC           "pubsub.topic"
+#define PUBSUB_ENDPOINT_SCOPE           "pubsub.scope"
+#define PUBSUB_ENDPOINT_TYPE            "pubsub.type"
+
 struct pubsub_endpoint {
-    char *frameworkUUID;
-    char *scope;
-    char *topic;
-    long serviceID;
-    char* endpoint;
-    bool is_secure;
+    long serviceID;         //optional
+    bool is_secure;         //optional
+    properties_pt endpoint_props;
     properties_pt topic_props;
 };
 
@@ -52,6 +58,7 @@ celix_status_t 
pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt i
 celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
+celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, 
const char* value);
 
 char *createScopeTopicKey(const char* scope, const char* topic);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c 
b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index c3fd293..d3b746e 100644
--- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -26,6 +26,7 @@
 
 #include <string.h>
 #include <stdlib.h>
+#include <uuid/uuid.h>
 
 #include "celix_errno.h"
 #include "celix_log.h"
@@ -42,22 +43,33 @@ static properties_pt 
pubsubEndpoint_getTopicProperties(bundle_pt bundle, const c
 
 static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
 
+       if (psEp->endpoint_props == NULL) {
+               psEp->endpoint_props = properties_create();
+       }
+
+       char endpointUuid[37];
+
+       uuid_t endpointUid;
+       uuid_generate(endpointUid);
+       uuid_unparse(endpointUid, endpointUuid);
+       properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid);
+
        if (fwUUID != NULL) {
-               psEp->frameworkUUID = strdup(fwUUID);
+               properties_set(psEp->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID);
        }
 
        if (scope != NULL) {
-               psEp->scope = strdup(scope);
+               properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, 
scope);
        }
 
        if (topic != NULL) {
-               psEp->topic = strdup(topic);
+               properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, 
topic);
        }
 
        psEp->serviceID = serviceId;
 
        if(endpoint != NULL) {
-               psEp->endpoint = strdup(endpoint);
+               properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, 
endpoint);
        }
 
        if(topic_props != NULL){
@@ -102,6 +114,21 @@ static properties_pt 
pubsubEndpoint_getTopicProperties(bundle_pt bundle, const c
        return topic_props;
 }
 
+celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, 
const char* value) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       if (ep->endpoint_props == NULL) {
+               printf("PUBSUB_EP: No endpoint_props for endpoint 
available!\n");
+               return CELIX_ILLEGAL_STATE;
+       }
+
+       if (key != NULL && value != NULL) {
+               properties_set(ep->endpoint_props, key, value);
+       }
+
+       return status;
+}
+
 celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
        celix_status_t status = CELIX_SUCCESS;
 
@@ -116,9 +143,22 @@ celix_status_t pubsubEndpoint_create(const char* fwUUID, 
const char* scope, cons
 celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out){
        celix_status_t status = CELIX_SUCCESS;
 
-       *out = calloc(1,sizeof(**out));
+    pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 
-       pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, 
in->serviceID, in->endpoint, in->topic_props, true);
+       status = properties_copy(in->endpoint_props, &(ep->endpoint_props));
+
+    if (in->topic_props != NULL) {
+        status += properties_copy(in->topic_props, &(ep->topic_props));
+    }
+
+       ep->serviceID = in->serviceID;
+       ep->is_secure = in->is_secure;
+
+    if (status == CELIX_SUCCESS) {
+        *out = ep;
+    } else {
+        pubsubEndpoint_destroy(ep);
+    }
 
        return status;
 
@@ -150,7 +190,11 @@ celix_status_t 
pubsubEndpoint_createFromServiceReference(service_reference_pt re
 
        pubsubEndpoint_setFields(ep, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
 
-       if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
+       if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) 
||
+                       !ep->serviceID ||
+                       !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE) ||
+                       !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC)) {
+
                fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
                status = CELIX_BUNDLE_EXCEPTION;
                pubsubEndpoint_destroy(ep);
@@ -206,30 +250,14 @@ celix_status_t 
pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt i
 
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
 
-       if(psEp->frameworkUUID!=NULL){
-               free(psEp->frameworkUUID);
-               psEp->frameworkUUID = NULL;
-       }
-
-       if(psEp->scope!=NULL){
-               free(psEp->scope);
-               psEp->scope = NULL;
-       }
-
-       if(psEp->topic!=NULL){
-               free(psEp->topic);
-               psEp->topic = NULL;
-       }
-
-       if(psEp->endpoint!=NULL){
-               free(psEp->endpoint);
-               psEp->endpoint = NULL;
-       }
-
        if(psEp->topic_props != NULL){
                properties_destroy(psEp->topic_props);
        }
 
+       if (psEp->endpoint_props != NULL) {
+               properties_destroy(psEp->endpoint_props);
+    }
+
        free(psEp);
 
        return CELIX_SUCCESS;
@@ -238,9 +266,9 @@ celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt 
psEp){
 
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 
-       return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
-                       (strcmp(psEp1->scope,psEp2->scope)==0) &&
-                       (strcmp(psEp1->topic,psEp2->topic)==0) &&
+       return ((strcmp(properties_get(psEp1->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) &&
+                       (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE))==0) &&
+                       (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC))==0) &&
                        (psEp1->serviceID == psEp2->serviceID) /*&&
                        ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
        );

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c 
b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index 3c3a5a8..726269a 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#include <jansson.h>
 
 #include "celix_log.h"
 #include "constants.h"
@@ -143,7 +144,60 @@ celix_status_t 
etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
                        status = CELIX_ILLEGAL_STATE;
                }
                else{
-                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
+
+                       // etcdValue contains the json formatted string
+                       json_error_t error;
+                       json_t* jsonRoot = json_loads(etcdValue, 
JSON_DECODE_ANY, &error);
+
+                       const char* endpoint_serializer = NULL;
+                       const char* endpoint_admin_type = NULL;
+                       const char* endpoint_url = NULL;
+                       const char* endpoint_type = NULL;
+
+                       if (json_is_object(jsonRoot)){
+
+                               void *iter = json_object_iter(jsonRoot);
+
+                               const char *key;
+                               json_t *value;
+
+                               while (iter) {
+                                       key = json_object_iter_key(iter);
+                                       value = json_object_iter_value(iter);
+
+                                       if (strcmp(key, 
PUBSUB_ENDPOINT_SERIALIZER) == 0) {
+                                               endpoint_serializer = 
json_string_value(value);
+                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) {
+                                               endpoint_admin_type = 
json_string_value(value);
+                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_URL) == 0) {
+                                               endpoint_url = 
json_string_value(value);
+                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_TYPE) == 0) {
+                                               endpoint_type = 
json_string_value(value);
+                                       }
+
+                                       iter = json_object_iter_next(jsonRoot, 
iter);
+                               }
+
+                               if (endpoint_url == NULL) {
+                                       printf("EW: No endpoint found in json 
object!\n");
+                                       endpoint_url = etcdValue;
+                               }
+
+                       } else {
+                               endpoint_url = etcdValue;
+                       }
+
+                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP);
+
+            if (status == CELIX_SUCCESS) {
+                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer);
+                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type);
+                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_TYPE, endpoint_type);
+            }
+
+                       if (jsonRoot != NULL) {
+                               json_decref(jsonRoot);
+                       }
                }
        }
        return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c 
b/pubsub/pubsub_discovery/private/src/etcd_writer.c
index 1c423f3..e820e50 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_writer.c
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#include <jansson.h>
 
 #include "celix_log.h"
 #include "constants.h"
@@ -82,7 +83,12 @@ void etcdWriter_destroy(etcd_writer_pt writer) {
        for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
                memset(dir,0,MAX_ROOTNODE_LENGTH);
-               
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+               snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
+                                rootPath,
+                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
+                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
+                                properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID));
+
                etcd_del(dir);
                pubsubEndpoint_destroy(pubEP);
        }
@@ -100,7 +106,7 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
        if(storeEP){
                const char *fwUUID = NULL;
                bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-               if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+               if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
                        celixThreadMutex_lock(&writer->localPubsLock);
                        pubsub_endpoint_pt p = NULL;
                        pubsubEndpoint_clone(pubEP, &p);
@@ -128,12 +134,33 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
 
        const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 
-       
asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
-
-       if(!etcd_set(key,pubEP->endpoint,ttl,false)){
+       asprintf(&key,"%s/%s/%s/%s/%ld",
+                        rootPath,
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
+                        properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                        pubEP->serviceID);
+
+    char serviceID [sizeof(pubEP->serviceID)];
+    snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID);
+       json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}",
+                                                                        
PUBSUB_ENDPOINT_SERVICE_ID, serviceID,
+                                                                        
PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) 
stored in endpoint
+                                                                        
PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint
+                                                                        
PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+                                                                        
PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary
+                                                                        
PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
+                                                                        
PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE)
+       );
+       char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT);
+
+       if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
                status = CELIX_ILLEGAL_ARGUMENT;
        }
        FREE_MEM(key);
+       FREE_MEM(jsonEndpointStr);
+       json_decref(jsonEndpoint);
+
        return status;
 }
 
@@ -143,7 +170,12 @@ celix_status_t 
etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_
 
        const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 
-       asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
+       asprintf(&key, "%s/%s/%s/%s/%ld",
+                        rootPath,
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
+                        properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                        pubEP->serviceID);
 
        celixThreadMutex_lock(&writer->localPubsLock);
        for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c 
b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
index 94a8e11..e3e9704 100644
--- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -143,7 +143,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
ps_discovery) {
         int i;
         for (i = 0; i < arrayList_size(pubEP_list); i++) {
             pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) 
arrayList_get(pubEP_list, i);
-            if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+            if (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
                 etcdWriter_deletePublisherEndpoint(ps_discovery->writer, 
pubEP);
             } else {
                 pubsub_discovery_informPublishersListeners(ps_discovery, 
pubEP, false);
@@ -177,7 +177,7 @@ celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt 
pubsub_discovery, pu
        bool inform=false;
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *pubs_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
        if(pubEP_list==NULL){
                arrayList_create(&pubEP_list);
@@ -216,11 +216,12 @@ celix_status_t 
pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery,
     bool found = false;
 
     celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-    char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+    char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
     array_list_pt pubEP_list = (array_list_pt) 
hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
     free(pubs_key);
     if (pubEP_list == NULL) {
-        printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n", pubEP->topic);
+        printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",
+                          properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
         status = CELIX_ILLEGAL_STATE;
     } else {
         int i;
@@ -278,12 +279,14 @@ celix_status_t 
pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pu
 /* Service's functions implementation */
 celix_status_t pubsub_discovery_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
        celix_status_t status = CELIX_SUCCESS;
-       printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, 
pubEP->endpoint);
+       printf("pubsub_discovery_announcePublisher : %s / %s\n",
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 
        if(pubEP_list==NULL){
@@ -310,11 +313,11 @@ celix_status_t pubsub_discovery_removePublisher(void 
*handle, pubsub_endpoint_pt
 
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
        free(pub_key);
        if(pubEP_list==NULL){
-               printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",pubEP->topic);
+               printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
                status = CELIX_ILLEGAL_STATE;
        }
        else{

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 987d864..4ee9951 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -216,7 +216,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * 
handle, service_referenc
                                int i;
                                for(i=0;i<arrayList_size(pubEP_list);i++){
                                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                                       
if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+                                       
if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
                                                
disc->removePublisher(disc->handle,pubEP);
                                        }
                                }
@@ -263,7 +263,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void 
* handle, service_ref
        pubsub_endpoint_pt sub = NULL;
        if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == 
CELIX_SUCCESS){
                celixThreadMutex_lock(&manager->subscriptionsLock);
-               char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
+               char *sub_key = 
createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 
                array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
                if(sub_list_by_topic==NULL){
@@ -300,7 +300,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void 
* handle, service_ref
                        service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
                        publisher_endpoint_announce_pt disc = NULL;
                        bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->interestedInTopic(disc->handle, sub->scope, 
sub->topic);
+                       disc->interestedInTopic(disc->handle, 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                        bundleContext_ungetService(manager->context, disc_sr, 
NULL);
                }
                hashMapIterator_destroy(iter);
@@ -339,7 +339,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                        service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
                        publisher_endpoint_announce_pt disc = NULL;
                        bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->uninterestedInTopic(disc->handle, subcmp->scope, 
subcmp->topic);
+                       disc->uninterestedInTopic(disc->handle, 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                        bundleContext_ungetService(manager->context, disc_sr, 
NULL);
                }
                hashMapIterator_destroy(iter);
@@ -348,7 +348,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                celixThreadMutex_lock(&manager->subscriptionsLock);
                celixThreadMutex_lock(&manager->psaListLock);
 
-               char *sub_key = 
createScopeTopicKey(subcmp->scope,subcmp->topic);
+               char *sub_key = 
createScopeTopicKey(properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
                array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
                free(sub_key);
                if(sub_list_by_topic!=NULL){
@@ -368,7 +368,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                                if(arrayList_size(sub_list_by_topic)==0){
                                        
for(k=0;k<arrayList_size(manager->psaList);k++){
                                                pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
+                                               
psa->closeAllSubscriptions(psa->admin, (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                                        }
                                }
 
@@ -415,7 +415,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
                array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
                for(int i = 0; i < arrayList_size(pubEP_list); i++) {
                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && 
(pubEP->endpoint!=NULL)){
+                       if( (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
                                status += 
disc->announcePublisher(disc->handle,pubEP);
                        }
                }
@@ -433,7 +433,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
                for(i=0;i<arrayList_size(l);i++){
                        pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 
-                       disc->interestedInTopic(disc->handle, subEp->scope, 
subEp->topic);
+                       disc->interestedInTopic(disc->handle, 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                }
        }
        hashMapIterator_destroy(iter);
@@ -486,7 +486,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
                if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) 
== CELIX_SUCCESS){
 
                        celixThreadMutex_lock(&manager->publicationsLock);
-                       char *pub_key = createScopeTopicKey(pub->scope, 
pub->topic);
+                       char *pub_key = 
createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
                        if(pub_list_by_topic==NULL){
                                arrayList_create(&pub_list_by_topic);
@@ -558,7 +558,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                        celixThreadMutex_lock(&manager->psaListLock);
                        celixThreadMutex_lock(&manager->publicationsLock);
 
-                       char *pub_key = createScopeTopicKey(pubcmp->scope, 
pubcmp->topic);
+                       char *pub_key = 
createScopeTopicKey(properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
                        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
                        if(pub_list_by_topic!=NULL){
                                
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
@@ -591,7 +591,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                                                
if(arrayList_size(pub_list_by_topic)==0){
                                                        
for(k=0;k<arrayList_size(manager->psaList);k++){
                                                                
pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                               
psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
+                                                               
psa->closeAllPublications(psa->admin, (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                                                        }
                                                }
 
@@ -617,13 +617,16 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 
 celix_status_t pubsub_topologyManager_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
+       printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 
        pubsub_topology_manager_pt manager = handle;
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
 
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
        if(pub_list_by_topic==NULL){
@@ -666,17 +669,20 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 
 celix_status_t pubsub_topologyManager_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: Publisher removed for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
+       printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
+                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
+                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 
        pubsub_topology_manager_pt manager = handle;
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
        int i;
 
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
        if(pub_list_by_topic==NULL){
-               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
+               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL));
                status = CELIX_ILLEGAL_STATE;
        }
        else{
@@ -704,7 +710,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void 
*handle, pubsub_endpo
 
                                for(i=0;i<arrayList_size(manager->psaList);i++){
                                        pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       
psa->closeAllPublications(psa->admin,p->scope, p->topic);
+                                       psa->closeAllPublications(psa->admin, 
(char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/utils/private/src/properties.c
----------------------------------------------------------------------
diff --git a/utils/private/src/properties.c b/utils/private/src/properties.c
index 1e097a0..860b9bb 100644
--- a/utils/private/src/properties.c
+++ b/utils/private/src/properties.c
@@ -212,6 +212,11 @@ void properties_set(properties_pt properties, const char* 
key, const char* value
        free(oldValue);
 }
 
+void properties_unset(properties_pt properties, const char* key) {
+       char* oldValue = hashMap_remove(properties, key);
+       free(oldValue);
+}
+
 static void updateBuffers(char **key, char ** value, char **output, int 
outputPos, int *key_len, int *value_len) {
        if (*output == *key) {
                if (outputPos == (*key_len) - 1) {

http://git-wip-us.apache.org/repos/asf/celix/blob/9094f554/utils/public/include/properties.h
----------------------------------------------------------------------
diff --git a/utils/public/include/properties.h 
b/utils/public/include/properties.h
index 5c6dc4d..582a242 100644
--- a/utils/public/include/properties.h
+++ b/utils/public/include/properties.h
@@ -56,6 +56,8 @@ UTILS_EXPORT const char 
*properties_getWithDefault(properties_pt properties, con
 
 UTILS_EXPORT void properties_set(properties_pt properties, const char *key, 
const char *value);
 
+UTILS_EXPORT void properties_unset(properties_pt properties, const char *key);
+
 UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, 
properties_pt *copy);
 
 #define PROPERTIES_FOR_EACH(props, key) \

Reply via email to