Repository: celix
Updated Branches:
  refs/heads/develop a55fd1523 -> 109edf4d0


http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/publisher.h 
b/pubsub/pubsub_api/include/pubsub/publisher.h
index 3eec149..9f7f3b6 100644
--- a/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,12 +30,11 @@
 #include <stdlib.h>
 
 #define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION             "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION           "2.0.0"
  
 //properties
-#define PUBSUB_PUBLISHER_TOPIC                  "pubsub.topic"
-#define PUBSUB_PUBLISHER_SCOPE                  "pubsub.scope"
-#define PUBSUB_PUBLISHER_STRATEGY               "pubsub.strategy"
+#define PUBSUB_PUBLISHER_TOPIC                  "topic"
+#define PUBSUB_PUBLISHER_SCOPE                  "scope"
 #define PUBSUB_PUBLISHER_CONFIG                 "pubsub.config"
  
 #define PUBSUB_PUBLISHER_SCOPE_DEFAULT                 "default"

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/subscriber.h 
b/pubsub/pubsub_api/include/pubsub/subscriber.h
index 5d87b8a..ca6d4d1 100644
--- a/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -33,9 +33,8 @@
 #define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
  
 //properties
-#define PUBSUB_SUBSCRIBER_TOPIC                "pubsub.topic"
-#define PUBSUB_SUBSCRIBER_SCOPE                "pubsub.scope"
-#define PUBSUB_SUBSCRIBER_STRATEGY             "pubsub.strategy"
+#define PUBSUB_SUBSCRIBER_TOPIC                "topic"
+#define PUBSUB_SUBSCRIBER_SCOPE                "scope"
 #define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
 
 #define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.c 
b/pubsub/pubsub_discovery/src/etcd_watcher.c
index 726269a..fe80c14 100644
--- a/pubsub/pubsub_discovery/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.c
@@ -127,18 +127,18 @@ celix_status_t 
etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
        char scope[MAX_FIELD_LENGTH];
        char topic[MAX_FIELD_LENGTH];
        char fwUUID[MAX_FIELD_LENGTH];
-       char serviceId[MAX_FIELD_LENGTH];
+       char pubsubUUID[MAX_FIELD_LENGTH];
 
        memset(rootPath,0,MAX_ROOTNODE_LENGTH);
        memset(topic,0,MAX_FIELD_LENGTH);
        memset(fwUUID,0,MAX_FIELD_LENGTH);
-       memset(serviceId,0,MAX_FIELD_LENGTH);
+       memset(pubsubUUID,0,MAX_FIELD_LENGTH);
 
        etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
 
        asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
        if(expr) {
-               int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
+               int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
pubsubUUID);
                free(expr);
                if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
                        status = CELIX_ILLEGAL_STATE;
@@ -149,50 +149,28 @@ celix_status_t 
etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
                        json_error_t error;
                        json_t* jsonRoot = json_loads(etcdValue, 
JSON_DECODE_ANY, &error);
 
-                       const char* endpoint_serializer = NULL;
-                       const char* endpoint_admin_type = NULL;
-                       const char* endpoint_url = NULL;
-                       const char* endpoint_type = NULL;
+                       properties_t *discovered_props = properties_create();
 
-                       if (json_is_object(jsonRoot)){
+                       if (json_is_object(jsonRoot)) {
 
-                               void *iter = json_object_iter(jsonRoot);
+                void *iter = json_object_iter(jsonRoot);
 
-                               const char *key;
-                               json_t *value;
+                const char *key;
+                json_t *value;
 
-                               while (iter) {
-                                       key = json_object_iter_key(iter);
-                                       value = json_object_iter_value(iter);
+                while (iter) {
+                    key = json_object_iter_key(iter);
+                    value = json_object_iter_value(iter);
 
-                                       if (strcmp(key, 
PUBSUB_ENDPOINT_SERIALIZER) == 0) {
-                                               endpoint_serializer = 
json_string_value(value);
-                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) {
-                                               endpoint_admin_type = 
json_string_value(value);
-                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_URL) == 0) {
-                                               endpoint_url = 
json_string_value(value);
-                                       } else if (strcmp(key, 
PUBSUB_ENDPOINT_TYPE) == 0) {
-                                               endpoint_type = 
json_string_value(value);
-                                       }
-
-                                       iter = json_object_iter_next(jsonRoot, 
iter);
-                               }
-
-                               if (endpoint_url == NULL) {
-                                       printf("EW: No endpoint found in json 
object!\n");
-                                       endpoint_url = etcdValue;
-                               }
-
-                       } else {
-                               endpoint_url = etcdValue;
-                       }
+                    properties_set(discovered_props, key, 
json_string_value(value));
+                    iter = json_object_iter_next(jsonRoot, iter);
+                }
+            }
 
-                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP);
 
-            if (status == CELIX_SUCCESS) {
-                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer);
-                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type);
-                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_TYPE, endpoint_type);
+            status = 
pubsubEndpoint_createFromDiscoveredProperties(discovered_props, pubEP);
+            if (status != CELIX_SUCCESS) {
+                properties_destroy(discovered_props);
             }
 
                        if (jsonRoot != NULL) {

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/etcd_writer.c 
b/pubsub/pubsub_discovery/src/etcd_writer.c
index e820e50..37220cc 100644
--- a/pubsub/pubsub_discovery/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/src/etcd_writer.c
@@ -85,9 +85,9 @@ void etcdWriter_destroy(etcd_writer_pt writer) {
                memset(dir,0,MAX_ROOTNODE_LENGTH);
                snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
                                 rootPath,
-                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
-                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
-                                properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID));
+                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+                                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID));
 
                etcd_del(dir);
                pubsubEndpoint_destroy(pubEP);
@@ -106,7 +106,7 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
        if(storeEP){
                const char *fwUUID = NULL;
                bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-               if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
+               if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
                        celixThreadMutex_lock(&writer->localPubsLock);
                        pubsub_endpoint_pt p = NULL;
                        pubsubEndpoint_clone(pubEP, &p);
@@ -134,32 +134,28 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
 
        const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 
-       asprintf(&key,"%s/%s/%s/%s/%ld",
+       asprintf(&key,"%s/%s/%s/%s/%s",
                         rootPath,
-                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
-                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
-                        properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
-                        pubEP->serviceID);
-
-    char serviceID [sizeof(pubEP->serviceID)];
-    snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID);
-       json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}",
-                                                                        
PUBSUB_ENDPOINT_SERVICE_ID, serviceID,
-                                                                        
PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) 
stored in endpoint
-                                                                        
PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint
-                                                                        
PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
-                                                                        
PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary
-                                                                        
PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
-                                                                        
PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE)
-       );
-       char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT);
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_UUID));
+
+
+       json_t *jsEndpoint = json_object();
+       const char* propKey = NULL;
+       PROPERTIES_FOR_EACH(pubEP->endpoint_props, propKey) {
+               const char* val = properties_get(pubEP->endpoint_props, 
propKey);
+               json_t* jsVal = json_string(val);
+               json_object_set(jsEndpoint, propKey, jsVal);
+       }
+       char* jsonEndpointStr = json_dumps(jsEndpoint, JSON_COMPACT);
 
        if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
                status = CELIX_ILLEGAL_ARGUMENT;
        }
-       FREE_MEM(key);
        FREE_MEM(jsonEndpointStr);
-       json_decref(jsonEndpoint);
+       json_decref(jsEndpoint);
 
        return status;
 }
@@ -170,12 +166,12 @@ celix_status_t 
etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_
 
        const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 
-       asprintf(&key, "%s/%s/%s/%s/%ld",
+       asprintf(&key, "%s/%s/%s/%s/%s",
                         rootPath,
-                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
-                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
-                        properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
-                        pubEP->serviceID);
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_UUID));
 
        celixThreadMutex_lock(&writer->localPubsLock);
        for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/psd_activator.c 
b/pubsub/pubsub_discovery/src/psd_activator.c
index 89a517d..ad1cc4a 100644
--- a/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/pubsub/pubsub_discovery/src/psd_activator.c
@@ -29,7 +29,6 @@
 
 #include "pubsub_common.h"
 #include "publisher_endpoint_announce.h"
-#include "pubsub_discovery.h"
 #include "pubsub_discovery_impl.h"
 
 struct activator {

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c 
b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index e3e9704..0c075fc 100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -42,6 +42,8 @@
 #include "pubsub_endpoint.h"
 #include "pubsub_discovery_impl.h"
 
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp);
+
 /* Discovery activator functions */
 celix_status_t pubsub_discovery_create(bundle_context_pt context, 
pubsub_discovery_pt *ps_discovery) {
        celix_status_t status = CELIX_SUCCESS;
@@ -56,11 +58,18 @@ celix_status_t pubsub_discovery_create(bundle_context_pt 
context, pubsub_discove
                (*ps_discovery)->discoveredPubs = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
                (*ps_discovery)->listenerReferences = 
hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
                (*ps_discovery)->watchers = 
hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
+               (*ps_discovery)->verbose = 
PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE;
                
celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
                celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, 
NULL);
                celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
        }
 
+       const char *verboseStr = NULL;
+       bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, 
&verboseStr);
+       if (verboseStr != NULL) {
+               (*ps_discovery)->verbose = strncasecmp("true", verboseStr, 
strlen("true")) == 0;
+       }
+
        return status;
 }
 
@@ -119,7 +128,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
ps_discovery) {
 
     bundleContext_getProperty(ps_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
     if (fwUUID == NULL) {
-        printf("PSD: Cannot retrieve fwUUID.\n");
+        fprintf(stderr, "ERROR PSD: Cannot retrieve fwUUID.\n");
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
 
@@ -143,7 +152,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
ps_discovery) {
         int i;
         for (i = 0; i < arrayList_size(pubEP_list); i++) {
             pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) 
arrayList_get(pubEP_list, i);
-            if (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
+            if (strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) {
                 etcdWriter_deletePublisherEndpoint(ps_discovery->writer, 
pubEP);
             } else {
                 pubsub_discovery_informPublishersListeners(ps_discovery, 
pubEP, false);
@@ -174,10 +183,17 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
ps_discovery) {
 
 celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, 
pubsub_endpoint_pt pubEP) {
        celix_status_t status = CELIX_SUCCESS;
-       bool inform=false;
+
+       bool valid = pubsub_discovery_isEndpointValid(pubEP);
+       if (!valid) {
+               status = CELIX_ILLEGAL_STATE;
+               return status;
+       }
+
+       bool inform = false;
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pubs_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+       char *pubs_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
        if(pubEP_list==NULL){
                arrayList_create(&pubEP_list);
@@ -216,12 +232,12 @@ celix_status_t 
pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery,
     bool found = false;
 
     celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-    char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+    char *pubs_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
     array_list_pt pubEP_list = (array_list_pt) 
hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
     free(pubs_key);
     if (pubEP_list == NULL) {
-        printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",
-                          properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+        printf("WARNING PSD: Cannot find any registered publisher for topic 
%s. Something is not consistent.\n",
+                          properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
         status = CELIX_ILLEGAL_STATE;
     } else {
         int i;
@@ -279,14 +295,25 @@ celix_status_t 
pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pu
 /* Service's functions implementation */
 celix_status_t pubsub_discovery_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
        celix_status_t status = CELIX_SUCCESS;
-       printf("pubsub_discovery_announcePublisher : %s / %s\n",
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
        pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
+       bool valid = pubsub_discovery_isEndpointValid(pubEP);
+       if (!valid) {
+               status = CELIX_ILLEGAL_ARGUMENT;
+               return status;
+       }
+
+       if (pubsub_discovery->verbose) {
+               printf("pubsub_discovery_announcePublisher : %s / %s\n",
+                          properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+                          properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL));
+       }
+
+
+
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 
        if(pubEP_list==NULL){
@@ -308,16 +335,21 @@ celix_status_t pubsub_discovery_announcePublisher(void 
*handle, pubsub_endpoint_
 
 celix_status_t pubsub_discovery_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
        celix_status_t status = CELIX_SUCCESS;
-
        pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
+       bool valid = pubsub_discovery_isEndpointValid(pubEP);
+       if (!valid) {
+               status = CELIX_ILLEGAL_ARGUMENT;
+               return status;
+       }
+
        celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 
-       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
        array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
        free(pub_key);
        if(pubEP_list==NULL){
-               printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+               printf("WARNING PSD: Cannot find any registered publisher for 
topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                status = CELIX_ILLEGAL_STATE;
        }
        else{
@@ -332,7 +364,7 @@ celix_status_t pubsub_discovery_removePublisher(void 
*handle, pubsub_endpoint_pt
                }
 
                if(!found){
-                       printf("PSD: Trying to remove a not existing endpoint. 
Something is not consistent.\n");
+                       printf("WARNING PSD: Trying to remove a not existing 
endpoint. Something is not consistent.\n");
                        status = CELIX_ILLEGAL_STATE;
                }
                else{
@@ -353,7 +385,7 @@ celix_status_t pubsub_discovery_removePublisher(void 
*handle, pubsub_endpoint_pt
 celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* 
scope, const char* topic) {
     pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
-    char *scope_topic_key = createScopeTopicKey(scope, topic);
+    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
     struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, 
scope_topic_key);
     if(wi) {
@@ -374,7 +406,7 @@ celix_status_t pubsub_discovery_interestedInTopic(void 
*handle, const char* scop
 celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* 
scope, const char* topic) {
     pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 
-    char *scope_topic_key = createScopeTopicKey(scope, topic);
+    char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
 
     hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, 
scope_topic_key);
@@ -426,7 +458,9 @@ celix_status_t 
pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_
        celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
        celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 
-       printf("PSD: pubsub_tm_announce_publisher added.\n");
+       if (pubsub_discovery->verbose) {
+               printf("PSD: pubsub_tm_announce_publisher added.\n");
+       }
 
        return status;
 }
@@ -450,7 +484,9 @@ celix_status_t 
pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic
 
        if (pubsub_discovery->listenerReferences != NULL) {
                if (hashMap_remove(pubsub_discovery->listenerReferences, 
reference)) {
-                       printf("PSD: pubsub_tm_announce_publisher removed.\n");
+                       if (pubsub_discovery->verbose) {
+                               printf("PSD: pubsub_tm_announce_publisher 
removed.\n");
+                       }
                }
        }
        celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
@@ -458,3 +494,38 @@ celix_status_t 
pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic
        return status;
 }
 
+static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) {
+       //required properties
+       bool valid = true;
+       static const char* keys[] = {
+                       PUBSUB_ENDPOINT_UUID,
+                       PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+                       PUBSUB_ENDPOINT_TYPE,
+                       PUBSUB_ENDPOINT_ADMIN_TYPE,
+                       PUBSUB_ENDPOINT_SERIALIZER,
+                       PUBSUB_ENDPOINT_TOPIC_NAME,
+                       PUBSUB_ENDPOINT_TOPIC_SCOPE,
+                       NULL };
+       int i;
+       for (i = 0; keys[i] != NULL; ++i) {
+               const char *val = properties_get(psEp->endpoint_props, keys[i]);
+               if (val == NULL) { //missing required key
+                       fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing 
key: '%s'\n", keys[i]);
+                       valid = false;
+               }
+       }
+       if (!valid) {
+               const char *key = NULL;
+               fprintf(stderr, "PubSubEndpoint entries:\n");
+               PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+                       fprintf(stderr, "\t'%s' : '%s'\n", key, 
properties_get(psEp->endpoint_props, key));
+               }
+               if (psEp->topic_props != NULL) {
+                       fprintf(stderr, "PubSubEndpoint topic properties 
entries:\n");
+                       PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+                               fprintf(stderr, "\t'%s' : '%s'\n", key, 
properties_get(psEp->topic_props, key));
+                       }
+               }
+       }
+       return valid;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h 
b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index 676a6ab..3d25b1c 100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -29,12 +29,15 @@
 
 #define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
 
+#define PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY "PUBSUB_ETCD_DISCOVERY_VERBOSE"
+#define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false
+
 struct watcher_info {
     etcd_watcher_pt watcher;
     int nr_references;
 };
 
-struct pubsub_discovery {
+typedef struct pubsub_discovery {
        bundle_context_pt context;
 
        celix_thread_mutex_t discoveredPubsMutex;
@@ -47,7 +50,11 @@ struct pubsub_discovery {
        hash_map_pt watchers; //key = topicname, value = struct watcher_info
 
        etcd_writer_pt writer;
-};
+
+       bool verbose;
+} pubsub_discovery_t;
+
+typedef struct pubsub_discovery *pubsub_discovery_pt;
 
 
 celix_status_t pubsub_discovery_create(bundle_context_pt context, 
pubsub_discovery_pt* node_discovery);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_serializer_json/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/src/ps_activator.c 
b/pubsub/pubsub_serializer_json/src/ps_activator.c
index fec5892..32dd1fc 100644
--- a/pubsub/pubsub_serializer_json/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/src/ps_activator.c
@@ -28,6 +28,7 @@
 
 #include "bundle_activator.h"
 #include "service_registration.h"
+#include "pubsub_constants.h"
 
 #include "pubsub_serializer_impl.h"
 
@@ -70,7 +71,7 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
 
                /* Set serializer type */
                properties_pt props = properties_create();
-               
properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE);
+               properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, 
PUBSUB_SERIALIZER_TYPE);
 
                status = bundleContext_registerService(context, 
PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, 
&activator->registration);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h 
b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
index bd39fc0..607e83a 100644
--- a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
+++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
@@ -22,6 +22,10 @@
 
 #include "pubsub_endpoint.h"
 
+
+//TODO refactor to pubsub_endpoint_announce
+//can be used to announce and remove publisher and subscriber endpoints
+
 struct publisher_endpoint_announce {
        void *handle;
        celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt 
pubEP);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h 
b/pubsub/pubsub_spi/include/pubsub_admin.h
index f24d825..5379415 100644
--- a/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,11 +32,8 @@
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
 
-#define PSA_IP         "PSA_IP"
-#define PSA_ITF        "PSA_INTERFACE"
-#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+#include "pubsub_constants.h"
 
-#define PUBSUB_ADMIN_TYPE_KEY  "pubsub_admin.type"
 
 typedef struct pubsub_admin *pubsub_admin_pt;
 
@@ -52,19 +49,20 @@ struct pubsub_admin_service {
        celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* 
scope, char* topic);
        celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* 
scope, char* topic);
 
+       //TODO add match function for subscription service and publication 
listeners, e.g.:
+       //matchPublisherListener(admin, bundle, filter, outScore)
+       //matchSubscriberService(admin, svcRef, outScore)
+
        /* Match principle:
-        * - A full matching pubsub_admin gives 200 points
-        * - A full matching serializer gives 100 points
-        * - If QoS = sample
-        *              - fallback pubsub_admin order of selection is: udp_mc, 
zmq. Points allocation is 100,75.
-        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
-        * - If QoS = control
-        *              - fallback pubsub_admin order of selection is: 
zmq,udp_mc. Points allocation is 100,75.
-        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
-        * - If nothing is specified, QoS = sample is assumed, so the same 
score applies, just divided by two.
-        *
+        * - A full matching pubsub_admin gives 100 points
         */
+       //TODO this should only be called for remote endpoints (e.g. not 
endpoints from this framework
        celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
+
+        //TODO redesign add function for handling endpoint seperate, e.g.: 
+        //addEndpoint(admin, endpoint); 
+        //note that endpoints can be subscribers and publishers
+        //Also note that we than can have pending subscribers and pending 
(subscriber/publisher) endpoints.
 };
 
 typedef struct pubsub_admin_service *pubsub_admin_service_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h 
b/pubsub/pubsub_spi/include/pubsub_admin_match.h
index e95ca7d..08d6582 100644
--- a/pubsub/pubsub_spi/include/pubsub_admin_match.h
+++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h
@@ -31,10 +31,17 @@
 #define QOS_TYPE_SAMPLE                "sample"        /* A.k.a. unreliable 
connection */
 #define QOS_TYPE_CONTROL       "control"       /* A.k.a. reliable connection */
 
-#define PUBSUB_ADMIN_FULL_MATCH_SCORE  200.0F
-#define SERIALIZER_FULL_MATCH_SCORE            100.0F
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score);
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE  100.0F
+
+celix_status_t pubsub_admin_match(
+        pubsub_endpoint_pt endpoint,
+        const char *pubsub_admin_type,
+        const char *frameworkUuid,
+        double sampleScore,
+        double controlScore,
+        double defaultScore,
+        array_list_pt serializerList,
+        double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, service_reference_pt *out);
 
 #endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_constants.h 
b/pubsub/pubsub_spi/include/pubsub_constants.h
new file mode 100644
index 0000000..47e31d3
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -0,0 +1,30 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_CONSTANTS_H_
+#define PUBSUB_CONSTANTS_H_
+
+#define PSA_IP         "PSA_IP"
+#define PSA_ITF        "PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PUBSUB_ADMIN_TYPE_KEY     "pubsub.config"
+#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type"
+
+#endif /* PUBSUB_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h 
b/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 598d673..c0492f5 100644
--- a/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -34,32 +34,48 @@
 #include "pubsub/publisher.h"
 #include "pubsub/subscriber.h"
 
-#define PUBSUB_ENDPOINT_ID              "pubsub.endpoint.id"
-#define PUBSUB_ENDPOINT_SERVICE_ID      "service.id"
-#define PUBSUB_ENDPOINT_SERIALIZER      "serializer"
-#define PUBSUB_ENDPOINT_ADMIN_TYPE      "pubsub.admin.type"
-#define PUBSUB_ENDPOINT_URL             "pubsub.endpoint"
-#define PUBSUB_ENDPOINT_TOPIC           "pubsub.topic"
-#define PUBSUB_ENDPOINT_SCOPE           "pubsub.scope"
-#define PUBSUB_ENDPOINT_TYPE            "pubsub.type"
+#include "pubsub_constants.h"
+
+//required for valid endpoint
+#define PUBSUB_ENDPOINT_UUID            "pubsub.endpoint.uuid" //required
+#define PUBSUB_ENDPOINT_FRAMEWORK_UUID  "pubsub.framework.uuid" //required
+#define PUBSUB_ENDPOINT_TYPE            "pubsub.endpoint.type" 
//PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE
+#define PUBSUB_ENDPOINT_ADMIN_TYPE       PUBSUB_ADMIN_TYPE_KEY
+#define PUBSUB_ENDPOINT_SERIALIZER       PUBSUB_SERIALIZER_TYPE_KEY
+#define PUBSUB_ENDPOINT_TOPIC_NAME      "pubsub.topic.name"
+#define PUBSUB_ENDPOINT_TOPIC_SCOPE     "pubsub.topic.scope"
+
+//optional
+#define PUBSUB_ENDPOINT_SERVICE_ID      "pubsub.service.id"
+#define PUBSUB_ENDPOINT_BUNDLE_ID       "pubsub.bundle.id"
+#define PUBSUB_ENDPOINT_URL             "pubsub.url"
+
+
+#define PUBSUB_PUBLISHER_ENDPOINT_TYPE  "pubsub.publisher"
+#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber"
+
 
 struct pubsub_endpoint {
-    long serviceID;         //optional
-    bool is_secure;         //optional
     properties_pt endpoint_props;
     properties_pt topic_props;
 };
 
 typedef struct pubsub_endpoint *pubsub_endpoint_pt;
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp, bool isPublisher);
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long bundleId, long serviceId, const char* endpoint, const 
char* pubsubType, properties_pt topic_props, pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t* 
ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t* 
ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out);
 celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
 celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, 
const char* value);
 
-char *createScopeTopicKey(const char* scope, const char* topic);
+/**
+ * Creates a pubsub_endpoint based on discovered properties.
+ * Will take ownership over the discovredProperties
+ */
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t 
*discoveredProperties, pubsub_endpoint_pt* out);
+
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* 
topic);
 
 #endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h 
b/pubsub/pubsub_spi/include/pubsub_serializer.h
index 4489fa4..a91e820 100644
--- a/pubsub/pubsub_spi/include/pubsub_serializer.h
+++ b/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -32,8 +32,6 @@
 
 #include "pubsub_common.h"
 
-#define PUBSUB_SERIALIZER_TYPE_KEY     "pubsub_serializer.type"
-
 /**
  * There should be a pubsub_serializer_t
  * per msg type (msg id) per bundle

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h 
b/pubsub/pubsub_spi/include/pubsub_utils.h
index eb961c9..e4c08ec 100644
--- a/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -30,10 +30,8 @@
 #include "bundle_context.h"
 #include "array_list.h"
 
-char* pubsub_getScopeFromFilter(const char* bundle_filter);
-char* pubsub_getTopicFromFilter(const char* bundle_filter);
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const 
char **topic, const char **scope);
 char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
-array_list_pt pubsub_getTopicsFromString(const char* string);
 
 
 #endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c 
b/pubsub/pubsub_spi/src/pubsub_admin_match.c
index cfe1dad..5d0fcc9 100644
--- a/pubsub/pubsub_spi/src/pubsub_admin_match.c
+++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -19,312 +19,151 @@
 
 
 #include <string.h>
+#include <limits.h>
+
 #include "service_reference.h"
 
 #include "pubsub_admin.h"
 
 #include "pubsub_admin_match.h"
-
-#define KNOWN_PUBSUB_ADMIN_NUM 2
-#define KNOWN_SERIALIZER_NUM   2
-
-static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"udp_mc","zmq"};
-static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
-
-static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"zmq","udp_mc"};
-static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
-
-static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
-static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
-
-static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType);
-static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService);
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score){
+#include "constants.h"
+
+/*
+ * Match can be called by
+ * a) a local registered pubsub_subscriber service
+ * b) a local opened service tracker for a pubsub_publisher service
+ * c) a remote found publisher endpoint
+ * Note subscribers are not (yet) dicovered remotely
+ */
+celix_status_t pubsub_admin_match(
+               pubsub_endpoint_pt endpoint,
+               const char *pubsub_admin_type,
+               const char *frameworkUuid,
+               double sampleScore,
+               double controlScore,
+               double defaultScore,
+               array_list_pt serializerList,
+               double *out) {
 
        celix_status_t status = CELIX_SUCCESS;
-       double final_score = 0;
-       int i = 0, j = 0;
+       double score = 0;
+
+       const char *endpointFrameworkUuid               = NULL;
+       const char *endpointAdminType                   = NULL;
 
        const char *requested_admin_type                = NULL;
-       const char *requested_serializer_type   = NULL;
        const char *requested_qos_type                  = NULL;
 
-       if(endpoint_props!=NULL){
-               requested_admin_type            = 
properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
-               requested_serializer_type       = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
-               requested_qos_type                      = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+       if (endpoint->endpoint_props != NULL) {
+               endpointFrameworkUuid = 
properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+               endpointAdminType = properties_get(endpoint->endpoint_props, 
PUBSUB_ENDPOINT_ADMIN_TYPE);
        }
-
-       /* Analyze the pubsub_admin */
-       if(requested_admin_type != NULL){ /* We got precise specification on 
the pubsub_admin we want */
-               
if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){
 //Full match
-                       final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected PSA */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                               
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                                       final_score += 
qos_pubsub_admin_score[i];
-                                       break;
-                               }
-                       }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                               
if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                                       final_score += 
qos_pubsub_admin_score[i];
-                                       break;
-                               }
-                       }
-               }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
-               }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                       
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                               final_score += (qos_pubsub_admin_score[i]/2);
-                               break;
-                       }
-               }
+       if (endpoint->topic_props != NULL) {
+               requested_admin_type = properties_get(endpoint->topic_props, 
PUBSUB_ADMIN_TYPE_KEY);
+               requested_qos_type = properties_get(endpoint->topic_props, 
QOS_ATTRIBUTE_KEY);
        }
 
-       char *serializer_type = NULL;
-       /* Analyze the serializers */
-       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
-               for(i=0;i<arrayList_size(serializerList);i++){
-                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
-                       get_serializer_type(svcRef, &serializer_type);
-                       if(serializer_type != NULL){
-                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
-                                       final_score += 
SERIALIZER_FULL_MATCH_SCORE;
-                                       break;
-                               }
-                       }
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       ser_found = true;
-                                               }
-                                       }
-                               }
-                               if(ser_found){
-                                       final_score += qos_serializer_score[i];
-                               }
+       if (endpointFrameworkUuid != NULL && frameworkUuid != NULL && 
strncmp(frameworkUuid, endpointFrameworkUuid, 128) == 0) {
+               //match for local subscriber or publisher
+
+               /* Analyze the pubsub_admin */
+               if (requested_admin_type != NULL) { /* We got precise 
specification on the pubsub_admin we want */
+                       if (strncmp(requested_admin_type, pubsub_admin_type, 
strlen(pubsub_admin_type)) == 0) { //Full match
+                               score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
                        }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       ser_found = true;
-                                               }
-                                       }
-                               }
-                               if(ser_found){
-                                       final_score += qos_serializer_score[i];
-                               }
+               } else if (requested_qos_type != NULL) { /* We got QoS 
specification that will determine the selected PSA */
+                       if (strncmp(requested_qos_type, QOS_TYPE_SAMPLE, 
strlen(QOS_TYPE_SAMPLE)) == 0) {
+                               score = sampleScore;
+                       } else if (strncmp(requested_qos_type, 
QOS_TYPE_CONTROL, strlen(QOS_TYPE_CONTROL)) == 0) {
+                               score += controlScore;
+                       } else {
+                               printf("Unknown QoS type '%s'\n", 
requested_qos_type);
+                               status = CELIX_ILLEGAL_ARGUMENT;
                        }
+               } else { /* We got no specification: fallback to default score 
*/
+                       score = defaultScore;
                }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
+
+               //NOTE serializer influence the score if a specific serializer 
is configured and not available.
+               //get best serializer. This is based on service raking or 
requested serializer. In the case of a request NULL is return if not request 
match is found.
+               service_reference_pt serSvcRef = NULL;
+               pubsub_admin_get_best_serializer(endpoint->topic_props, 
serializerList, &serSvcRef);
+               const char *serType = NULL; //for printing info
+               if (serSvcRef == NULL) {
+                       score = 0;
+               } else {
+                       serviceReference_getProperty(serSvcRef, 
PUBSUB_SERIALIZER_TYPE_KEY, &serType);
                }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               bool ser_found = false;
-               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                               service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                               get_serializer_type(svcRef, &serializer_type);
-                               if(serializer_type != NULL){
-                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                               ser_found = true;
-                                       }
-                               }
-                       }
-                       if(ser_found){
-                               final_score += (qos_serializer_score[i]/2);
-                       }
+
+               printf("Score for psa type %s is %f. Serializer used is 
'%s'\n", pubsub_admin_type, score, serType);
+       } else {
+               //remote publisher. score will be 0 or 100. nothing else.
+               //TODO FIXME remote publisher should go through a different 
process. Currently it is confusing what to match
+               if (endpointAdminType == NULL) {
+                       score = 0;
+
+//                     const char *key = NULL;
+//                     printf("Endpoint properties:\n");
+//                     PROPERTIES_FOR_EACH(endpoint->endpoint_props, key) {
+//                             printf("\t%s=%s\n", key, 
properties_get(endpoint->endpoint_props, key));
+//                     }
+
+                       fprintf(stderr, "WARNING PSA MATCH: remote publisher 
has no type. The key '%s' must be specified\n", PUBSUB_ENDPOINT_ADMIN_TYPE);
+               } else  {
+                       score = strncmp(endpointAdminType, pubsub_admin_type, 
1024) == 0 ? 100 : 0;
                }
+               printf("Score for psa type %s is %f. Publisher is remote\n", 
pubsub_admin_type, score);
        }
 
-       *score = final_score;
 
-       printf("Score for pair <%s,%s> = 
%f\n",pubsub_admin_type,serializer_type,final_score);
+       *out = score;
 
        return status;
 }
 
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, service_reference_pt *out){
        celix_status_t status = CELIX_SUCCESS;
-
-       int i = 0, j = 0;
-
+       int i;
        const char *requested_serializer_type = NULL;
-       const char *requested_qos_type = NULL;
 
        if (endpoint_props != NULL){
                requested_serializer_type = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
-               requested_qos_type = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
        }
 
        service_reference_pt svcRef = NULL;
-       void *svc = NULL;
-
-       /* Analyze the serializers */
-       if (arrayList_size(serializerList) == 1) {
-               // Only one serializer, use this one
-               svcRef = (service_reference_pt)arrayList_get(serializerList,0);
-               manage_service_from_reference(svcRef, &svc, true);
-               *serSvc = svc;
-               char *serializer_type = NULL;
-               get_serializer_type(svcRef, &serializer_type);
-               printf("Selected the only serializer available. Type = %s\n", 
serializer_type);
-
-       }
-       else if(requested_serializer_type != NULL){ /* We got precise 
specification on the serializer we want */
-               for(i=0;i<arrayList_size(serializerList);i++){
-                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
-                       char *serializer_type = NULL;
-                       get_serializer_type(svcRef, &serializer_type);
-                       if(serializer_type != NULL){
-                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
-                                       manage_service_from_reference(svcRef, 
&svc,true);
-                                       if(svc==NULL){
-                                               printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                       }
-                                       *serSvc = svc;
-                                       break;
-                               }
-                       }
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       char *serializer_type = NULL;
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       
manage_service_from_reference(svcRef, &svc,true);
-                                                       if(svc==NULL){
-                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                                       }
-                                                       else{
-                                                               *serSvc = svc;
-                                                               ser_found = 
true;
-                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       char *serializer_type = NULL;
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       
manage_service_from_reference(svcRef, &svc,true);
-                                                       if(svc==NULL){
-                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                                       }
-                                                       else{
-                                                               *serSvc = svc;
-                                                               ser_found = 
true;
-                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
-               }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               bool ser_found = false;
-               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                               svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                               char *serializer_type = NULL;
-                               get_serializer_type(svcRef, &serializer_type);
-                               if(serializer_type != NULL){
-                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                               
manage_service_from_reference(svcRef, &svc,true);
-                                               if(svc==NULL){
-                                                       printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                       status = 
CELIX_SERVICE_EXCEPTION;
-                                               }
-                                               else{
-                                                       *serSvc = svc;
-                                                       ser_found = true;
-                                                       printf("Selected %s 
serializer as best without any 
specification\n",qos_sample_serializer_prio_list[i]);
-                                               }
-                                       }
-                               }
+    service_reference_pt best = NULL;
+    long hightestRanking = LONG_MIN;
+
+    if (requested_serializer_type != NULL) {
+        for (i = 0; i < arrayList_size(serializerList); ++i) {
+            svcRef = (service_reference_pt) arrayList_get(serializerList, 0);
+                       const char* currentSerType = NULL;
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, 
&currentSerType);
+            if (currentSerType != NULL && strncmp(requested_serializer_type, 
currentSerType, 128) == 0) {
+                best = svcRef;
+                break;
+            }
+        }
+    } else {
+        //no specific serializer request -> search for highest ranking 
serializer service
+        for (i = 0; i < arrayList_size(serializerList); ++i) {
+            svcRef = (service_reference_pt)arrayList_get(serializerList,0);
+            const char *service_ranking_str  = NULL;
+                       const char* currentSerType = NULL;
+            serviceReference_getProperty(svcRef, 
OSGI_FRAMEWORK_SERVICE_RANKING, &service_ranking_str);
+                       serviceReference_getProperty(svcRef, 
PUBSUB_SERIALIZER_TYPE_KEY, &currentSerType);
+            long svcRanking = service_ranking_str == NULL ? LONG_MIN : 
strtol(service_ranking_str, NULL, 10);
+            if (best == NULL || (svcRanking > hightestRanking && 
currentSerType != NULL)) {
+                best = svcRef;
+                hightestRanking = svcRanking;
+            }
+                       if (currentSerType == NULL) {
+                               fprintf(stderr, "Invalid pubsub_serializer 
service. Must have a property '%s'\n", PUBSUB_SERIALIZER_TYPE_KEY);
                        }
-               }
-       }
-
-       if(svc!=NULL && svcRef!=NULL){
-               manage_service_from_reference(svcRef, svc, false);
-       }
-
-       return status;
-}
+        }
+    }
 
-static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType){
+       *out = best;
 
-       const char *serType = NULL;
-       serviceReference_getProperty(svcRef, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType != NULL){
-               *serializerType = (char*)serType;
-       }
-       else{
-               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",svcRef);
-               *serializerType = NULL;
-       }
-}
-
-static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService){
-       bundle_context_pt context = NULL;
-       bundle_pt bundle = NULL;
-       serviceReference_getBundle(svcRef, &bundle);
-       bundle_getContext(bundle, &context);
-       if(getService){
-               bundleContext_getService(context, svcRef, svc);
-       }
-       else{
-               bundleContext_ungetService(context, svcRef, NULL);
-       }
-}
+    return status;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c 
b/pubsub/pubsub_spi/src/pubsub_endpoint.c
index d3b746e..1950433 100644
--- a/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -38,10 +38,11 @@
 #include "pubsub_utils.h"
 
 
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps);
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long bundleId, long 
serviceId,const char* endpoint, const char *pubsubType, properties_pt 
topic_props);
 static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher);
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp);
 
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, 
const char* endpoint, const char *pubsubType, properties_pt topic_props) {
 
        if (psEp->endpoint_props == NULL) {
                psEp->endpoint_props = properties_create();
@@ -52,33 +53,43 @@ static void pubsubEndpoint_setFields(pubsub_endpoint_pt 
psEp, const char* fwUUID
        uuid_t endpointUid;
        uuid_generate(endpointUid);
        uuid_unparse(endpointUid, endpointUuid);
-       properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid);
+
+       properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_UUID, 
endpointUuid);
 
        if (fwUUID != NULL) {
-               properties_set(psEp->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID);
+               properties_set(psEp->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
        }
 
        if (scope != NULL) {
-               properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, 
scope);
+               properties_set(psEp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
        }
 
        if (topic != NULL) {
-               properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, 
topic);
+               properties_set(psEp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME, topic);
        }
 
-       psEp->serviceID = serviceId;
+    char idBuf[32];
+
+    if (bundleId >= 0) {
+        snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID, idBuf);
+    }
+
+    if (serviceId >= 0) {
+        snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, 
idBuf);
+    }
 
        if(endpoint != NULL) {
                properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, 
endpoint);
        }
 
-       if(topic_props != NULL){
-               if(cloneProps){
-                       properties_copy(topic_props, &(psEp->topic_props));
-               }
-               else{
-                       psEp->topic_props = topic_props;
-               }
+    if (pubsubType != NULL) {
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TYPE, pubsubType);
+    }
+
+       if(topic_props != NULL) {
+        properties_copy(topic_props, &(psEp->topic_props));
        }
 }
 
@@ -129,12 +140,22 @@ celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt 
ep, const char* key, c
        return status;
 }
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long bundleId,  long serviceId, const char* endpoint, const 
char* pubsubType, properties_pt topic_props,pubsub_endpoint_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
-       *psEp = calloc(1, sizeof(**psEp));
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
 
-       pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, 
endpoint, topic_props, true);
+       pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, 
serviceId, endpoint, pubsubType, topic_props);
+
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
 
        return status;
 
@@ -151,9 +172,6 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, 
pubsub_endpoint_pt *o
         status += properties_copy(in->topic_props, &(ep->topic_props));
     }
 
-       ep->serviceID = in->serviceID;
-       ep->is_secure = in->is_secure;
-
     if (status == CELIX_SUCCESS) {
         *out = ep;
     } else {
@@ -161,23 +179,18 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt 
in, pubsub_endpoint_pt *o
     }
 
        return status;
-
 }
 
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t 
*ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* 
out){
        celix_status_t status = CELIX_SUCCESS;
 
        pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 
-       bundle_pt bundle = NULL;
-       bundle_context_pt ctxt = NULL;
        const char* fwUUID = NULL;
-       serviceReference_getBundle(reference,&bundle);
-       bundle_getContext(bundle,&ctxt);
-       bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+       bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 
        const char* scope = NULL;
-       serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+       serviceReference_getPropertyWithDefault(reference, 
PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, &scope);
 
        const char* topic = NULL;
        serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
@@ -185,70 +198,105 @@ celix_status_t 
pubsubEndpoint_createFromServiceReference(service_reference_pt re
        const char* serviceId = NULL;
        
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 
+
+    long bundleId = -1;
+    bundle_pt bundle = NULL;
+    serviceReference_getBundle(reference, &bundle);
+    if (bundle != NULL) {
+        bundle_getBundleId(bundle, &bundleId);
+    }
+
        /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
        properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
 
-       pubsubEndpoint_setFields(ep, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
+    const char *pubsubType = isPublisher ? PUBSUB_PUBLISHER_ENDPOINT_TYPE : 
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
 
-       if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) 
||
-                       !ep->serviceID ||
-                       !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE) ||
-                       !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC)) {
+       pubsubEndpoint_setFields(ep, fwUUID, scope, topic, bundleId, 
strtol(serviceId,NULL,10), NULL, pubsubType, topic_props);
 
-               fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
-               status = CELIX_BUNDLE_EXCEPTION;
-               pubsubEndpoint_destroy(ep);
-               *psEp = NULL;
-       }
-       else{
-               *psEp = ep;
-       }
+    if (!pubsubEndpoint_isEndpointValid(ep)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = ep;
+    } else {
+        pubsubEndpoint_destroy(ep);
+    }
 
        return status;
 
 }
 
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher){
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t 
*discoveredProperties, pubsub_endpoint_pt* out) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+    if (psEp != NULL) {
+        psEp->endpoint_props = discoveredProperties;
+    } else {
+        status = CELIX_ENOMEM;
+    }
+
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t 
*ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
        const char* fwUUID=NULL;
-       
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+       bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 
-       if(fwUUID==NULL){
+       if( fwUUID==NULL) {
                return CELIX_BUNDLE_EXCEPTION;
        }
 
-       char* topic = pubsub_getTopicFromFilter(info->filter);
-       if(topic==NULL){
+       const char* topic = NULL;
+       const char* scope = NULL;
+       pubsub_getPubSubInfoFromFilter(info->filter, &topic, &scope);
+
+       if (topic==NULL) {
                return CELIX_BUNDLE_EXCEPTION;
        }
-
-       *psEp = calloc(1, sizeof(**psEp));
-
-       char* scope = pubsub_getScopeFromFilter(info->filter);
        if(scope == NULL) {
                scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
        }
 
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(**out));
+
        bundle_pt bundle = NULL;
        long bundleId = -1;
        bundleContext_getBundle(info->context,&bundle);
-
        bundle_getBundleId(bundle,&bundleId);
 
        properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
 
        /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
-       pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, 
topic_props, false);
+       pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, -1, 
NULL, PUBSUB_PUBLISHER_ENDPOINT_TYPE, topic_props);
 
-       free(topic);
-       free(scope);
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
 
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
 
        return status;
 }
 
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+    if (psEp == NULL) return;
 
        if(psEp->topic_props != NULL){
                properties_destroy(psEp->topic_props);
@@ -260,23 +308,53 @@ celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt 
psEp){
 
        free(psEp);
 
-       return CELIX_SUCCESS;
+       return;
 
 }
 
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 
-       return ((strcmp(properties_get(psEp1->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) &&
-                       (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE))==0) &&
-                       (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC))==0) &&
-                       (psEp1->serviceID == psEp2->serviceID) /*&&
-                       ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
-       );
+       return strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_UUID),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_UUID));
 }
 
-char *createScopeTopicKey(const char* scope, const char* topic) {
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* 
topic) {
        char *result = NULL;
        asprintf(&result, "%s:%s", scope, topic);
 
        return result;
 }
+
+
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) {
+    //required properties
+    bool valid = true;
+    static const char* keys[] = {
+        PUBSUB_ENDPOINT_UUID,
+        PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+        PUBSUB_ENDPOINT_TYPE,
+        PUBSUB_ENDPOINT_TOPIC_NAME,
+        PUBSUB_ENDPOINT_TOPIC_SCOPE,
+        NULL };
+    int i;
+    for (i = 0; keys[i] != NULL; ++i) {
+        const char *val = properties_get(psEp->endpoint_props, keys[i]);
+        if (val == NULL) { //missing required key
+            fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing 
key: '%s'\n", keys[i]);
+            valid = false;
+        }
+    }
+    if (!valid) {
+        const char *key = NULL;
+        fprintf(stderr, "PubSubEndpoint entries:\n");
+        PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+            fprintf(stderr, "\t'%s' : '%s'\n", key, 
properties_get(psEp->endpoint_props, key));
+        }
+        if (psEp->topic_props != NULL) {
+            fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+            PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+                fprintf(stderr, "\t'%s' : '%s'\n", key, 
properties_get(psEp->topic_props, key));
+            }
+        }
+    }
+    return valid;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c 
b/pubsub/pubsub_spi/src/pubsub_utils.c
index 53bacb8..daf4e9f 100644
--- a/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -42,85 +42,43 @@
 
 #define MAX_KEYBUNDLE_LENGTH 256
 
-char* pubsub_getScopeFromFilter(const char* bundle_filter){
 
-       char* scope = NULL;
-
-       char* filter = strdup(bundle_filter);
-
-       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
-       if(oc!=NULL){
-               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
-               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
-                       char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
-                       if(scopes!=NULL){
-
-                               scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
-                               char* bottom=strchr(scopes,')');
-                               *bottom='\0';
-
-                               scope=strdup(scopes);
-                       } else {
-                           scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const 
char **topicOut, const char **scopeOut) {
+       celix_status_t status = CELIX_SUCCESS;
+       const char *topic = NULL;
+       const char *scope = NULL;
+       const char *objectClass = NULL;
+       celix_filter_t *filter = filter_create(filterstr);
+       if (filter != NULL) {
+               if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and 
pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=exmpl))
+                       array_list_t *attributes = filter->children;
+                       unsigned int i;
+                       unsigned int size = arrayList_size(attributes);
+                       for (i = 0; i < size; ++i) {
+                               filter_t *attr = arrayList_get(attributes, i);
+                               if (attr->operand == 
CELIX_FILTER_OPERAND_EQUAL) {
+                                       if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, 
attr->attribute, 128) == 0) {
+                                               objectClass = attr->value;
+                                       } else if 
(strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
+                                               topic = attr->value;
+                                       } else if 
(strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
+                                               scope = attr->value;
+                                       }
+                               }
                        }
                }
        }
 
-       free(filter);
-
-       return scope;
-}
-
-char* pubsub_getTopicFromFilter(const char* bundle_filter){
-
-       char* topic = NULL;
-
-       char* filter = strdup(bundle_filter);
-
-       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
-       if(oc!=NULL){
-               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
-               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
-                       char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
-                       if(topics!=NULL){
-
-                               topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
-                               char* bottom=strchr(topics,')');
-                               *bottom='\0';
-
-                               topic=strdup(topics);
-
-                       }
-               }
+       if (topic != NULL && objectClass != NULL && strncmp(objectClass, 
PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
+               *topicOut = topic;
+               *scopeOut = scope;
+       } else {
+               *topicOut = NULL;
+               *scopeOut = NULL;
        }
-
-       free(filter);
-
-       return topic;
-
+       return status;
 }
 
-array_list_pt pubsub_getTopicsFromString(const char* string){
-
-       array_list_pt topic_list = NULL;
-       arrayList_create(&topic_list);
-
-       char* topics = strdup(string);
-
-       char* topic = strtok(topics,",;|# ");
-       arrayList_add(topic_list,strdup(topic));
-
-       while( (topic = strtok(NULL,",;|# ")) !=NULL){
-               arrayList_add(topic_list,strdup(topic));
-       }
-
-       free(topics);
-
-       return topic_list;
-
-}
 
 /**
  * Loop through all bundles and look for the bundle with the keys inside.

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 71a9ad9..5b983d4 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -110,6 +110,13 @@ celix_status_t 
pubsub_topologyManager_create(bundle_context_pt context, log_help
        (*manager)->shellCmdService.handle = *manager;
        (*manager)->shellCmdService.executeCommand = shellCommand;
 
+       (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
+       const char *verboseStr = NULL;
+       bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, 
&verboseStr);
+       if (verboseStr != NULL) {
+               (*manager)->verbose = strncasecmp("true", verboseStr, 
strlen("true")) == 0;
+       }
+
        properties_pt shellProps = properties_create();
        properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
        properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
@@ -182,6 +189,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
        celixThreadMutex_lock(&manager->subscriptionsLock);
        hash_map_iterator_pt subscriptionsIterator = 
hashMapIterator_create(manager->subscriptions);
 
+       //TODO FIXME no matching used, should only add unmatched subscribers ?
+       //NOTE this is a bug which occurs when psa are started after bundles 
that uses the PSA
        while (hashMapIterator_hasNext(subscriptionsIterator)) {
                array_list_pt sub_ep_list = 
hashMapIterator_nextValue(subscriptionsIterator);
                for(i=0;i<arrayList_size(sub_ep_list);i++){
@@ -197,6 +206,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
        status = celixThreadMutex_lock(&manager->publicationsLock);
        hash_map_iterator_pt publicationsIterator = 
hashMapIterator_create(manager->publications);
 
+       //TODO FIXME no matching used, should only add unmatched publications ?
+       //NOTE this is a bug which occurs when psa are started after bundles 
that uses the PSA
        while (hashMapIterator_hasNext(publicationsIterator)) {
                array_list_pt pub_ep_list = 
hashMapIterator_nextValue(publicationsIterator);
                for(i=0;i<arrayList_size(pub_ep_list);i++){
@@ -252,7 +263,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * 
handle, service_referenc
                                unsigned int i;
                                for(i=0;i<arrayList_size(pubEP_list);i++){
                                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                                       
if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+                                       
if(strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
                                                
disc->removePublisher(disc->handle,pubEP);
                                        }
                                }
@@ -297,9 +308,9 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void 
* handle, service_ref
        //subscriber_service_pt subscriber = (subscriber_service_pt)service;
 
        pubsub_endpoint_pt sub = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == 
CELIX_SUCCESS){
+       if(pubsubEndpoint_createFromServiceReference(manager->context, 
reference,false, &sub) == CELIX_SUCCESS){
                celixThreadMutex_lock(&manager->subscriptionsLock);
-               char *sub_key = 
createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+               char *sub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(sub->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
 
                array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
                if(sub_list_by_topic==NULL){
@@ -319,13 +330,13 @@ celix_status_t 
pubsub_topologyManager_subscriberAdded(void * handle, service_ref
                for(j=0;j<arrayList_size(manager->psaList);j++){
                        pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
                        psa->matchEndpoint(psa->admin,sub,&score);
-                       if(score>best_score){ /* We have a new winner! */
+                       if (score > best_score) { /* We have a new winner! */
                                best_score = score;
                                best_psa = psa;
                        }
                }
 
-               if(best_psa != NULL && best_score>0){
+               if (best_psa != NULL && best_score>0) {
                        best_psa->addSubscription(best_psa->admin,sub);
                }
 
@@ -336,7 +347,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void 
* handle, service_ref
                        service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
                        publisher_endpoint_announce_pt disc = NULL;
                        bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->interestedInTopic(disc->handle, 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                       disc->interestedInTopic(disc->handle, 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                        bundleContext_ungetService(manager->context, disc_sr, 
NULL);
                }
                hashMapIterator_destroy(iter);
@@ -364,7 +375,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
        pubsub_topology_manager_pt manager = handle;
 
        pubsub_endpoint_pt subcmp = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) 
== CELIX_SUCCESS){
+       if(pubsubEndpoint_createFromServiceReference(manager->context, 
reference, false, &subcmp) == CELIX_SUCCESS){
 
                unsigned int j,k;
 
@@ -375,7 +386,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                        service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
                        publisher_endpoint_announce_pt disc = NULL;
                        bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->uninterestedInTopic(disc->handle, 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                       disc->uninterestedInTopic(disc->handle, 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                        bundleContext_ungetService(manager->context, disc_sr, 
NULL);
                }
                hashMapIterator_destroy(iter);
@@ -384,7 +395,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                celixThreadMutex_lock(&manager->subscriptionsLock);
                celixThreadMutex_lock(&manager->psaListLock);
 
-               char *sub_key = 
createScopeTopicKey(properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+               char *sub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
                free(sub_key);
                if(sub_list_by_topic!=NULL){
@@ -404,7 +415,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                                if(arrayList_size(sub_list_by_topic)==0){
                                        
for(k=0;k<arrayList_size(manager->psaList);k++){
                                                pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->closeAllSubscriptions(psa->admin, (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                                               
psa->closeAllSubscriptions(psa->admin, (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                                        }
                                }
 
@@ -451,7 +462,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
                array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
                for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       if( (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
+                       if( (strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
                                status += 
disc->announcePublisher(disc->handle,pubEP);
                        }
                }
@@ -469,7 +480,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
                for(i=0;i<arrayList_size(l);i++){
                        pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 
-                       disc->interestedInTopic(disc->handle, 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                       disc->interestedInTopic(disc->handle, 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                }
        }
        hashMapIterator_destroy(iter);
@@ -517,10 +528,10 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
                listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
                pubsub_endpoint_pt pub = NULL;
-               if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) 
== CELIX_SUCCESS){
+               if(pubsubEndpoint_createFromListenerHookInfo(manager->context, 
info, true, &pub) == CELIX_SUCCESS){
 
                        celixThreadMutex_lock(&manager->publicationsLock);
-                       char *pub_key = 
createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pub->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
                        if(pub_list_by_topic==NULL){
                                arrayList_create(&pub_list_by_topic);
@@ -546,7 +557,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
                                }
                        }
 
-                       if(best_psa != NULL && best_score>0){
+                       if (best_psa != NULL && best_score > 0) {
                                status = 
best_psa->addPublication(best_psa->admin,pub);
                                if(status==CELIX_SUCCESS){
                                        
celixThreadMutex_lock(&manager->discoveryListLock);
@@ -585,14 +596,14 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
                pubsub_endpoint_pt pubcmp = NULL;
-               if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) 
== CELIX_SUCCESS){
+               if(pubsubEndpoint_createFromListenerHookInfo(manager->context, 
info, true, &pubcmp) == CELIX_SUCCESS){
 
 
                        unsigned int j,k;
                        celixThreadMutex_lock(&manager->psaListLock);
                        celixThreadMutex_lock(&manager->publicationsLock);
 
-                       char *pub_key = 
createScopeTopicKey(properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+                       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
                        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
                        if(pub_list_by_topic!=NULL){
                                
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
@@ -625,7 +636,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                                                
if(arrayList_size(pub_list_by_topic)==0){
                                                        
for(k=0;k<arrayList_size(manager->psaList);k++){
                                                                
pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                               
psa->closeAllPublications(psa->admin, (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                                                               
psa->closeAllPublications(psa->admin, (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                                                        }
                                                }
 
@@ -651,16 +662,20 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 
 celix_status_t pubsub_topologyManager_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    pubsub_topology_manager_pt manager = handle;
+
+    if (manager->verbose) {
+        printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",
+               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
 
-       pubsub_topology_manager_pt manager = handle;
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
 
-       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
 
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
        if(pub_list_by_topic==NULL){
@@ -672,7 +687,7 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
        /* Shouldn't be any other duplicate, since it's filtered out by the 
discovery */
        pubsub_endpoint_pt p = NULL;
        pubsubEndpoint_clone(pubEP, &p);
-       arrayList_add(pub_list_by_topic,p);
+       arrayList_add(pub_list_by_topic , p);
 
        unsigned int j;
        double score = 0;
@@ -681,14 +696,16 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 
        for(j=0;j<arrayList_size(manager->psaList);j++){
                pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-               psa->matchEndpoint(psa->admin,p,&score);
-               if(score>best_score){ /* We have a new winner! */
+               psa->matchEndpoint(psa->admin , p, &score);
+               if (score>best_score) { /* We have a new winner! */
                        best_score = score;
                        best_psa = psa;
                }
        }
 
-       if(best_psa != NULL && best_score>0){
+       if(best_psa != NULL && best_score>0) {
+        //TODO FIXME this the same call as used by publisher of service 
trackers. This is confusing.
+        //remote discovered publication can be handle different.
                best_psa->addPublication(best_psa->admin,p);
        }
        else{
@@ -703,20 +720,24 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 
 celix_status_t pubsub_topologyManager_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-                  properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
-                  properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    pubsub_topology_manager_pt manager = handle;
+
+    if (manager->verbose) {
+        printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, 
epUUID=%s]\n",
+               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+    }
 
-       pubsub_topology_manager_pt manager = handle;
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
        unsigned int i;
 
-       char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
+       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
        if(pub_list_by_topic==NULL){
-               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL));
+               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL));
                status = CELIX_ILLEGAL_STATE;
        }
        else{
@@ -744,7 +765,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void 
*handle, pubsub_endpo
 
                                for(i=0;i<arrayList_size(manager->psaList);i++){
                                        pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       psa->closeAllPublications(psa->admin, 
(char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+                                       psa->closeAllPublications(psa->admin, 
(char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h 
b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index cdcc651..769048d 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -37,6 +37,9 @@
 #include "pubsub/publisher.h"
 #include "pubsub/subscriber.h"
 
+       #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY             
"PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
+#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE                false
+
 
 struct pubsub_topology_manager {
        bundle_context_pt context;
@@ -58,6 +61,8 @@ struct pubsub_topology_manager {
 
 
        log_helper_pt loghelper;
+
+       bool verbose;
 };
 
 typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;

Reply via email to