Repository: celix
Updated Branches:
  refs/heads/develop 83018f069 -> 42545f1a9


http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c 
b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index e0e23d4..fec5892 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -68,7 +68,11 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
                pubsubSerializerSvc->destroySerializerMap = 
(void*)pubsubSerializer_destroySerializerMap;
                activator->serializerService = pubsubSerializerSvc;
 
-               status = bundleContext_registerService(context, 
PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);
+               /* Set serializer type */
+               properties_pt props = properties_create();
+               
properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE);
+
+               status = bundleContext_registerService(context, 
PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, 
&activator->registration);
 
        }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c 
b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index fb46f10..cffc816 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -79,206 +79,217 @@ celix_status_t 
pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
        return status;
 }
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* 
serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) {
-    celix_status_t status = CELIX_SUCCESS;
-    pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
-    if (map != NULL) {
-        map->bundle = bundle;
-        map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
-        pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
-    } else {
-        logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot 
allocate memory for msg map");
-        status = CELIX_ENOMEM;
-    }
-
-    if (status == CELIX_SUCCESS) {
-        *out = map;
-    }
-    return status;
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* 
serializer, bundle_pt bundle, hash_map_pt* serializerMap) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
+
+       if (map != NULL) {
+               pubsubSerializer_fillMsgSerializerMap(map, bundle);
+       } else {
+               logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, 
"Cannot allocate memory for msg map");
+               status = CELIX_ENOMEM;
+       }
+
+       if (status == CELIX_SUCCESS) {
+               *serializerMap = map;
+       }
+       return status;
 }
 
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* 
serializer, pubsub_msg_serializer_map_t* map) {
-    celix_status_t status = CELIX_SUCCESS;
-    if (map == NULL) {
-        return status;
-    }
-
-    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-        pubsub_msg_serializer_impl_t* impl = msgSer->handle;
-        dynMessage_destroy(impl->dynMsg); //note msgSer->name and 
msgSer->version owned by dynType
-        free(impl); //also contains the service struct.
-    }
-    hashMap_destroy(map->serializers, false, false);
-    free(map);
-    return status;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* 
serializer, hash_map_pt serializerMap) {
+       celix_status_t status = CELIX_SUCCESS;
+       if (serializerMap == NULL) {
+               return CELIX_ILLEGAL_ARGUMENT;
+       }
+
+       hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
+       while (hashMapIterator_hasNext(&iter)) {
+               pubsub_msg_serializer_t* msgSerializer = 
hashMapIterator_nextValue(&iter);
+               dyn_message_type *dynMsg = 
(dyn_message_type*)msgSerializer->handle;
+               dynMessage_destroy(dynMsg); //note msgSer->name and 
msgSer->version owned by dynType
+               free(msgSerializer); //also contains the service struct.
+       }
+
+       hashMap_destroy(serializerMap, false, false);
+
+       return status;
 }
 
 
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* 
impl, const void* msg, char** out, size_t *outLen) {
-    celix_status_t status = CELIX_SUCCESS;
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* 
msgSerializer, const void* msg, void** out, size_t *outLen) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       char *jsonOutput = NULL;
+       dyn_type* dynType = NULL;
+       dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+       dynMessage_getMessageType(dynMsg, &dynType);
 
-    char *jsonOutput = NULL;
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-       int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
-       if (rc != 0){
+       if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
                status = CELIX_BUNDLE_EXCEPTION;
        }
-    if (status == CELIX_SUCCESS) {
-        *out = jsonOutput;
-        *outLen = strlen(jsonOutput) + 1;
-    }
+
+       if (status == CELIX_SUCCESS) {
+               *out = jsonOutput;
+               *outLen = strlen(jsonOutput) + 1;
+       }
 
        return status;
 }
 
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* 
impl, const char* input, size_t inputLen, void **out) {
-    celix_status_t status = CELIX_SUCCESS;
-    void *msg = NULL;
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-    int rc = jsonSerializer_deserialize(dynType, input, &msg);
-    if (rc != 0) {
-        status = CELIX_BUNDLE_EXCEPTION;
-    }
-    else{
-        *out = msg;
-    }
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* 
msgSerializer, const void* input, size_t inputLen, void **out) {
+
+       celix_status_t status = CELIX_SUCCESS;
+       void *msg = NULL;
+       dyn_type* dynType = NULL;
+       dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+       dynMessage_getMessageType(dynMsg, &dynType);
+
+       if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) 
{
+               status = CELIX_BUNDLE_EXCEPTION;
+       }
+       else{
+               *out = msg;
+       }
+
        return status;
 }
 
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void 
*msg) {
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-    if (dynType != NULL) {
-        dynType_free(dynType, msg);
-    }
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void 
*msg) {
+       dyn_type* dynType = NULL;
+       dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+       dynMessage_getMessageType(dynMsg, &dynType);
+       if (dynType != NULL) {
+               dynType_free(dynType, msg);
+       }
 }
 
 
 static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, 
bundle_pt bundle) {
-    char* root = NULL;
-    char* metaInfPath = NULL;
+       char* root = NULL;
+       char* metaInfPath = NULL;
 
-    root = pubsubSerializer_getMsgDescriptionDir(bundle);
+       root = pubsubSerializer_getMsgDescriptionDir(bundle);
 
-    if(root != NULL){
-        asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+       if(root != NULL){
+               asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
 
-        pubsubSerializer_addMsgSerializerFromBundle(root, bundle, 
msgSerializers);
-        pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, 
msgSerializers);
+               pubsubSerializer_addMsgSerializerFromBundle(root, bundle, 
msgSerializers);
+               pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, 
bundle, msgSerializers);
 
-        free(metaInfPath);
-        free(root);
-    }
+               free(metaInfPath);
+               free(root);
+       }
 }
 
 static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
 {
-    char *root = NULL;
+       char *root = NULL;
 
-    bool isSystemBundle = false;
-    bundle_isSystemBundle(bundle, &isSystemBundle);
+       bool isSystemBundle = false;
+       bundle_isSystemBundle(bundle, &isSystemBundle);
 
-    if(isSystemBundle == true) {
-        bundle_context_pt context;
-        bundle_getContext(bundle, &context);
+       if(isSystemBundle == true) {
+               bundle_context_pt context;
+               bundle_getContext(bundle, &context);
 
-        const char *prop = NULL;
+               const char *prop = NULL;
 
-        bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+               bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, 
&prop);
 
-        if(prop != NULL) {
-            root = strdup(prop);
-        } else {
-            root = getcwd(NULL, 0);
-        }
-    } else {
-        bundle_getEntry(bundle, ".", &root);
-    }
+               if(prop != NULL) {
+                       root = strdup(prop);
+               } else {
+                       root = getcwd(NULL, 0);
+               }
+       } else {
+               bundle_getEntry(bundle, ".", &root);
+       }
 
-    return root;
+       return root;
 }
 
 
 static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, 
bundle_pt bundle, hash_map_pt msgSerializers)
 {
-    char path[128];
-    struct dirent *entry = NULL;
-    DIR *dir = opendir(root);
-
-    if(dir) {
-        entry = readdir(dir);
-    }
-
-    while (entry != NULL) {
-
-        if (strstr(entry->d_name, ".descriptor") != NULL) {
-
-            printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
-            memset(path,0,128);
-            snprintf(path, 128, "%s/%s", root, entry->d_name);
-            FILE *stream = fopen(path,"r");
-
-            if (stream != NULL){
-                dyn_message_type* msgType = NULL;
-
-                int rc = dynMessage_parse(stream, &msgType);
-                if (rc == 0 && msgType != NULL) {
-
-                    char* msgName = NULL;
-                    rc += dynMessage_getName(msgType,&msgName);
-
-                    version_pt msgVersion = NULL;
-                    rc += dynMessage_getVersion(msgType, &msgVersion);
-
-                    if(rc == 0 && msgName != NULL && msgVersion != NULL){
-
-                       unsigned int msgId = utils_stringHash(msgName);
-
-                       pubsub_msg_serializer_impl_t* impl = calloc(1, 
sizeof(*impl));
-                       impl->dynMsg = msgType;
-                       impl->msgSerializer.handle = impl;
-                       impl->msgSerializer.msgId = msgId;
-                       impl->msgSerializer.msgName = msgName;
-                       impl->msgSerializer.msgVersion = msgVersion;
-                       impl->msgSerializer.serialize = (void*) 
pubsubMsgSerializer_serialize;
-                       impl->msgSerializer.deserialize = (void*) 
pubsubMsgSerializer_deserialize;
-                       impl->msgSerializer.freeMsg = (void*) 
pubsubMsgSerializer_freeMsg;
-
-                       bool clash = hashMap_containsKey(msgSerializers, 
(void*)(uintptr_t)msgId);
-                       if (clash) {
-                               printf("Cannot add msg %s. clash in msg id 
%d!!\n", msgName, msgId);
-                               free(impl);
-                               dynMessage_destroy(msgType);
-                       } else if (msgId != 0) {
-                               hashMap_put(msgSerializers, 
(void*)(uintptr_t)msgId, &impl->msgSerializer);
-                       } else {
-                               printf("Error creating msg serializer\n");
-                               free(impl);
-                               dynMessage_destroy(msgType);
-                       }
-                    }
-                    else{
-                       printf("Cannot retrieve name and/or version from 
msg\n");
-                    }
-
-                } else{
-                    printf("DMU: cannot parse message from descriptor 
%s\n.",path);
-                }
-                fclose(stream);
-            }else{
-                printf("DMU: cannot open descriptor file %s\n.",path);
-            }
-
-        }
-        entry = readdir(dir);
-    }
-
-    if(dir) {
-        closedir(dir);
-    }
+       char path[128];
+       struct dirent *entry = NULL;
+       DIR *dir = opendir(root);
+
+       if(dir) {
+               entry = readdir(dir);
+       }
+
+       while (entry != NULL) {
+
+               if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+                       printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+                       memset(path,0,128);
+                       snprintf(path, 128, "%s/%s", root, entry->d_name);
+                       FILE *stream = fopen(path,"r");
+
+                       if (stream != NULL){
+                               dyn_message_type* msgType = NULL;
+
+                               int rc = dynMessage_parse(stream, &msgType);
+                               if (rc == 0 && msgType != NULL) {
+
+                                       char* msgName = NULL;
+                                       rc += 
dynMessage_getName(msgType,&msgName);
+
+                                       version_pt msgVersion = NULL;
+                                       rc += dynMessage_getVersion(msgType, 
&msgVersion);
+
+                                       if(rc == 0 && msgName != NULL && 
msgVersion != NULL){
+
+                                               unsigned int msgId = 
utils_stringHash(msgName);
+
+                                               pubsub_msg_serializer_t 
*msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t));
+
+                                               msgSerializer->handle = msgType;
+                                               msgSerializer->msgId = msgId;
+                                               msgSerializer->msgName = 
msgName;
+                                               msgSerializer->msgVersion = 
msgVersion;
+                                               msgSerializer->serialize = 
(void*) pubsubMsgSerializer_serialize;
+                                               msgSerializer->deserialize = 
(void*) pubsubMsgSerializer_deserialize;
+                                               msgSerializer->freeMsg = 
(void*) pubsubMsgSerializer_freeMsg;
+
+                                               bool clash = 
hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+                                               if (clash){
+                                                       printf("Cannot add msg 
%s. clash in msg id %d!!\n", msgName, msgId);
+                                                       free(msgSerializer);
+                                                       
dynMessage_destroy(msgType);
+                                               }
+                                               else if (msgId != 0){
+                                                       printf("Adding %u : 
%s\n", msgId, msgName);
+                                                       
hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer);
+                                               }
+                                               else{
+                                                       printf("Error creating 
msg serializer\n");
+                                                       free(msgSerializer);
+                                                       
dynMessage_destroy(msgType);
+                                               }
+
+                                       }
+                                       else{
+                                               printf("Cannot retrieve name 
and/or version from msg\n");
+                                       }
+
+                               } else{
+                                       printf("DMU: cannot parse message from 
descriptor %s\n.",path);
+                               }
+                               fclose(stream);
+                       }else{
+                               printf("DMU: cannot open descriptor file 
%s\n.",path);
+                       }
+
+               }
+               entry = readdir(dir);
+       }
+
+       if(dir) {
+               closedir(dir);
+       }
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h 
b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
index c7cb100..7614e0c 100644
--- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
@@ -41,9 +41,6 @@
 struct pubsub_topology_manager {
        bundle_context_pt context;
 
-       celix_thread_mutex_t serializerListLock;
-       array_list_pt serializerList;
-
        celix_thread_mutex_t psaListLock;
        array_list_pt psaList;
 
@@ -65,22 +62,14 @@ celix_status_t 
pubsub_topologyManager_create(bundle_context_pt context, log_help
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt 
manager);
 celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt 
manager);
 
-celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void *handle, 
service_reference_pt reference, void **service);
-celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void *handle, 
service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_pubsubSerializerModified(void *handle, 
service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void *handle, 
service_reference_pt reference, void *service);
-
-celix_status_t pubsub_topologyManager_psaAdding(void *handle, 
service_reference_pt reference, void **service);
 celix_status_t pubsub_topologyManager_psaAdded(void *handle, 
service_reference_pt reference, void *service);
 celix_status_t pubsub_topologyManager_psaModified(void *handle, 
service_reference_pt reference, void *service);
 celix_status_t pubsub_topologyManager_psaRemoved(void *handle, 
service_reference_pt reference, void *service);
 
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, 
service_reference_pt reference, void** service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, 
service_reference_pt reference, void* service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, 
service_reference_pt reference, void* service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, 
service_reference_pt reference, void* service);
 
-celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, 
service_reference_pt reference, void **service);
 celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, 
service_reference_pt reference, void * service);
 celix_status_t pubsub_topologyManager_subscriberModified(void * handle, 
service_reference_pt reference, void * service);
 celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, 
service_reference_pt reference, void * service);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c 
b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
index c202e7a..0ce2571 100644
--- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -48,7 +48,6 @@ struct activator {
 
        pubsub_topology_manager_pt manager;
 
-       service_tracker_pt pubsubSerializerTracker;
        service_tracker_pt pubsubDiscoveryTracker;
        service_tracker_pt pubsubAdminTracker;
        service_tracker_pt pubsubSubscribersTracker;
@@ -62,38 +61,19 @@ struct activator {
        log_helper_pt loghelper;
 };
 
-static celix_status_t bundleActivator_createPSSTracker(struct activator 
*activator, service_tracker_pt *tracker);
+
 static celix_status_t bundleActivator_createPSDTracker(struct activator 
*activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSATracker(struct activator 
*activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSSubTracker(struct activator 
*activator, service_tracker_pt *tracker);
 
 
-static celix_status_t bundleActivator_createPSSTracker(struct activator 
*activator, service_tracker_pt *tracker) {
-       celix_status_t status;
-
-       service_tracker_customizer_pt customizer = NULL;
-
-       status = serviceTrackerCustomizer_create(activator->manager,
-                       pubsub_topologyManager_pubsubSerializerAdding,
-                       pubsub_topologyManager_pubsubSerializerAdded,
-                       pubsub_topologyManager_pubsubSerializerModified,
-                       pubsub_topologyManager_pubsubSerializerRemoved,
-                       &customizer);
-
-       if (status == CELIX_SUCCESS) {
-               status = serviceTracker_create(activator->context, (char *) 
PUBSUB_SERIALIZER_SERVICE, customizer, tracker);
-       }
-
-       return status;
-}
-
 static celix_status_t bundleActivator_createPSDTracker(struct activator 
*activator, service_tracker_pt *tracker) {
        celix_status_t status;
 
        service_tracker_customizer_pt customizer = NULL;
 
        status = serviceTrackerCustomizer_create(activator->manager,
-                       pubsub_topologyManager_pubsubDiscoveryAdding,
+                       NULL,
                        pubsub_topologyManager_pubsubDiscoveryAdded,
                        pubsub_topologyManager_pubsubDiscoveryModified,
                        pubsub_topologyManager_pubsubDiscoveryRemoved,
@@ -112,7 +92,7 @@ static celix_status_t 
bundleActivator_createPSATracker(struct activator *activat
        service_tracker_customizer_pt customizer = NULL;
 
        status = serviceTrackerCustomizer_create(activator->manager,
-                       pubsub_topologyManager_psaAdding,
+                       NULL,
                        pubsub_topologyManager_psaAdded,
                        pubsub_topologyManager_psaModified,
                        pubsub_topologyManager_psaRemoved,
@@ -131,7 +111,7 @@ static celix_status_t 
bundleActivator_createPSSubTracker(struct activator *activ
        service_tracker_customizer_pt customizer = NULL;
 
        status = serviceTrackerCustomizer_create(activator->manager,
-                       pubsub_topologyManager_subscriberAdding,
+                       NULL,
                        pubsub_topologyManager_subscriberAdded,
                        pubsub_topologyManager_subscriberModified,
                        pubsub_topologyManager_subscriberRemoved,
@@ -163,36 +143,15 @@ celix_status_t bundleActivator_create(bundle_context_pt 
context, void **userData
        if (status == CELIX_SUCCESS) {
                status = bundleActivator_createPSDTracker(activator, 
&activator->pubsubDiscoveryTracker);
                if (status == CELIX_SUCCESS) {
-                       status = bundleActivator_createPSSTracker(activator, 
&activator->pubsubSerializerTracker);
-                       if (status == CELIX_SUCCESS){
-                               status = 
bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
+                       status = bundleActivator_createPSATracker(activator, 
&activator->pubsubAdminTracker);
+                       if (status == CELIX_SUCCESS) {
+                               status = 
bundleActivator_createPSSubTracker(activator, 
&activator->pubsubSubscribersTracker);
                                if (status == CELIX_SUCCESS) {
-                                       status = 
bundleActivator_createPSSubTracker(activator, 
&activator->pubsubSubscribersTracker);
-                                       if (status == CELIX_SUCCESS) {
-                                               *userData = activator;
-                                       }
-                                       if (status != CELIX_SUCCESS){
-                                               
serviceTracker_destroy(activator->pubsubAdminTracker);
-                                       }
-                               }
-                               if (status != CELIX_SUCCESS){
-                                       
serviceTracker_destroy(activator->pubsubSerializerTracker);
+                                       *userData = activator;
                                }
                        }
-                       if (status != CELIX_SUCCESS){
-                               
serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-                       }
-               }
-               if (status != CELIX_SUCCESS){
-                       pubsub_topologyManager_destroy(activator->manager);
                }
        }
-       if (status != CELIX_SUCCESS){ // an exception occurred so free 
allocated memory
-               logHelper_stop(activator->loghelper);
-               logHelper_destroy(&activator->loghelper);
-               free(activator);
-
-       }
 
        return status;
 }
@@ -224,12 +183,13 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
        properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, 
(char *) PUBSUB_TOPIC_INFO_SERVICE);
        status += bundleContext_registerService(context, (char *) 
PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, 
&activator->topicInfoService);
        */
-
-       status += serviceTracker_open(activator->pubsubSerializerTracker);
        status += serviceTracker_open(activator->pubsubAdminTracker);
+
        status += serviceTracker_open(activator->pubsubDiscoveryTracker);
+
        status += serviceTracker_open(activator->pubsubSubscribersTracker);
 
+
        return status;
 }
 
@@ -239,7 +199,6 @@ celix_status_t bundleActivator_stop(void * userData, 
bundle_context_pt context)
 
        serviceTracker_close(activator->pubsubSubscribersTracker);
        serviceTracker_close(activator->pubsubDiscoveryTracker);
-       serviceTracker_close(activator->pubsubSerializerTracker);
        serviceTracker_close(activator->pubsubAdminTracker);
 
        serviceRegistration_unregister(activator->publisherEPDiscoverService);
@@ -261,7 +220,6 @@ celix_status_t bundleActivator_destroy(void * userData, 
bundle_context_pt contex
 
                serviceTracker_destroy(activator->pubsubSubscribersTracker);
                serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-               serviceTracker_destroy(activator->pubsubSerializerTracker);
                serviceTracker_destroy(activator->pubsubAdminTracker);
 
                logHelper_stop(activator->loghelper);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 0e7923b..b4e8f46 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -23,7 +23,6 @@
  *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
  *  \copyright Apache License, Version 2.0
  */
-
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -66,15 +65,13 @@ celix_status_t 
pubsub_topologyManager_create(bundle_context_pt context, log_help
        celixThreadMutexAttr_create(&psaAttr);
        celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
        status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
-    celixThreadMutexAttr_destroy(&psaAttr);
+       celixThreadMutexAttr_destroy(&psaAttr);
 
-    status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+       status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
        status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
        status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
-       status = celixThreadMutex_create(&(*manager)->serializerListLock, NULL);
 
        arrayList_create(&(*manager)->psaList);
-       arrayList_create(&(*manager)->serializerList);
 
        (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, 
NULL, serviceReference_equals2, NULL);
        (*manager)->publications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
@@ -98,11 +95,6 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
        celixThreadMutex_unlock(&manager->psaListLock);
        celixThreadMutex_destroy(&manager->psaListLock);
 
-       celixThreadMutex_lock(&manager->serializerListLock);
-       arrayList_destroy(manager->serializerList);
-       celixThreadMutex_unlock(&manager->serializerListLock);
-       celixThreadMutex_destroy(&manager->serializerListLock);
-
        celixThreadMutex_lock(&manager->publicationsLock);
        hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
        while(hashMapIterator_hasNext(pubit)){
@@ -138,24 +130,18 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
        return status;
 }
 
-
-celix_status_t pubsub_topologyManager_psaAdding(void * handle, 
service_reference_pt reference, void **service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       status = bundleContext_getService(manager->context, reference, service);
-
-       return status;
-}
-
 celix_status_t pubsub_topologyManager_psaAdded(void * handle, 
service_reference_pt reference, void * service) {
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = handle;
-       int i, j;
+       int i;
 
-       pubsub_admin_service_pt new_psa = (pubsub_admin_service_pt) service;
+       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
PSA");
 
+       celixThreadMutex_lock(&manager->psaListLock);
+       arrayList_add(manager->psaList, psa);
+       celixThreadMutex_unlock(&manager->psaListLock);
+
        // Add already detected subscriptions to new PSA
        celixThreadMutex_lock(&manager->subscriptionsLock);
        hash_map_iterator_pt subscriptionsIterator = 
hashMapIterator_create(manager->subscriptions);
@@ -163,27 +149,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
        while (hashMapIterator_hasNext(subscriptionsIterator)) {
                array_list_pt sub_ep_list = 
hashMapIterator_nextValue(subscriptionsIterator);
                for(i=0;i<arrayList_size(sub_ep_list);i++){
-                       pubsub_endpoint_pt sub = 
(pubsub_endpoint_pt)arrayList_get(sub_ep_list,i);
-                       double new_psa_score;
-                       new_psa->matchSubscriber(new_psa->admin, sub, 
&new_psa_score);
-                       pubsub_admin_service_pt best_psa = NULL;
-                       double highest_score = 0;
-
-                       for(j=0;j<arrayList_size(manager->psaList);j++){
-                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                               double score;
-                               psa->matchSubscriber(psa->admin, sub, &score);
-                               if (score > highest_score){
-                                       highest_score = score;
-                                       best_psa = psa;
-                               }
-                       }
-                       if (best_psa != NULL && (new_psa_score > 
highest_score)){
-                               best_psa->removeSubscription(best_psa->admin, 
sub);
-                       }
-                       if (new_psa_score > highest_score){
-                               status += 
new_psa->addSubscription(new_psa->admin, sub);
-                       }
+                       status += psa->addSubscription(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
                }
        }
 
@@ -198,27 +164,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
        while (hashMapIterator_hasNext(publicationsIterator)) {
                array_list_pt pub_ep_list = 
hashMapIterator_nextValue(publicationsIterator);
                for(i=0;i<arrayList_size(pub_ep_list);i++){
-                       pubsub_endpoint_pt pub = 
(pubsub_endpoint_pt)arrayList_get(pub_ep_list,i);
-                       double new_psa_score;
-                       new_psa->matchPublisher(new_psa->admin, pub, 
&new_psa_score);
-                       pubsub_admin_service_pt best_psa = NULL;
-                       double highest_score = 0;
-
-                       for(j=0;j<arrayList_size(manager->psaList);j++){
-                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                               double score;
-                               psa->matchPublisher(psa->admin, pub, &score);
-                               if (score > highest_score){
-                                       highest_score = score;
-                                       best_psa = psa;
-                               }
-                       }
-                       if (best_psa != NULL && (new_psa_score > 
highest_score)){
-                               best_psa->removePublication(best_psa->admin, 
pub);
-                       }
-                       if (new_psa_score > highest_score){
-                               status += 
new_psa->addPublication(new_psa->admin, pub);
-                       }
+                       status += psa->addPublication(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
                }
        }
 
@@ -226,20 +172,6 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
 
        celixThreadMutex_unlock(&manager->publicationsLock);
 
-
-       celixThreadMutex_lock(&manager->serializerListLock);
-       unsigned int size = arrayList_size(manager->serializerList);
-       if (size > 0) {
-               pubsub_serializer_service_t* ser = 
arrayList_get(manager->serializerList, (size-1)); //last, same as result of 
add/remove serializer
-               new_psa->setSerializer(new_psa->admin, ser);
-       }
-       celixThreadMutex_unlock(&manager->serializerListLock);
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_add(manager->psaList, new_psa);
-       celixThreadMutex_unlock(&manager->psaListLock);
-
-
        return status;
 }
 
@@ -325,97 +257,13 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * 
handle, service_referenc
        return status;
 }
 
-celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void* handle, 
service_reference_pt reference, void** service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       bundleContext_getService(manager->context, reference, service);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, 
service_reference_pt reference, void* service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_topology_manager_pt manager = handle;
-       pubsub_serializer_service_t* new_serializer = 
(pubsub_serializer_service_t*) service;
-
-       celixThreadMutex_lock(&manager->serializerListLock);
-
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
pubsub serializer");
-
-       int i;
-       for(i=0; i<arrayList_size(manager->psaList); i++){
-               pubsub_admin_service_pt psa = (pubsub_admin_service_pt) 
arrayList_get(manager->psaList,i);
-               psa->setSerializer(psa->admin, new_serializer);
-       }
-
-       arrayList_add(manager->serializerList, new_serializer);
-
-       celixThreadMutex_unlock(&manager->serializerListLock);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       // Nop...
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_topology_manager_pt manager = handle;
-       pubsub_serializer_service_t* new_serializer = 
(pubsub_serializer_service_t*) service;
-
-       celixThreadMutex_lock(&manager->serializerListLock);
-
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed 
pubsub serializer");
-
-       int i, j;
-
-       for(i=0; i<arrayList_size(manager->psaList); i++){
-               pubsub_admin_service_pt psa = (pubsub_admin_service_pt) 
arrayList_get(manager->psaList,i);
-               psa->removeSerializer(psa->admin, new_serializer);
-       }
-
-       arrayList_removeElement(manager->serializerList, new_serializer);
-
-       if (arrayList_size(manager->serializerList) > 0){
-               //there is another serializer available, change the admin so it 
is using another serializer
-               pubsub_serializer_service_t* replacing_serializer = 
(pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
-
-               for(j=0; j<arrayList_size(manager->psaList); j++){
-                       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) 
arrayList_get(manager->psaList,j);
-                       psa->setSerializer(psa->admin, replacing_serializer);
-               }
-       }
-
-       celixThreadMutex_unlock(&manager->serializerListLock);
-
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, 
service_reference_pt reference, void **service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       status = bundleContext_getService(manager->context, reference, service);
-
-       return status;
-}
-
 celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, 
service_reference_pt reference, void * service) {
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = handle;
        //subscriber_service_pt subscriber = (subscriber_service_pt)service;
 
        pubsub_endpoint_pt sub = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&sub) == 
CELIX_SUCCESS){
+       if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == 
CELIX_SUCCESS){
                celixThreadMutex_lock(&manager->subscriptionsLock);
                char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
 
@@ -430,40 +278,40 @@ celix_status_t 
pubsub_topologyManager_subscriberAdded(void * handle, service_ref
                celixThreadMutex_unlock(&manager->subscriptionsLock);
 
                int j;
-               celixThreadMutex_lock(&manager->psaListLock);
-               double highest_score = -1;
+               double score = 0;
+               double best_score = 0;
                pubsub_admin_service_pt best_psa = NULL;
-
+               celixThreadMutex_lock(&manager->psaListLock);
                for(j=0;j<arrayList_size(manager->psaList);j++){
                        pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                       double score;
-                       psa->matchSubscriber(psa->admin, sub, &score);
-                       if (score > highest_score){
-                               highest_score = score;
+                       psa->matchEndpoint(psa->admin,sub,&score);
+                       if(score>best_score){ /* We have a new winner! */
+                               best_score = score;
                                best_psa = psa;
                        }
                }
-               if (best_psa != NULL){
+
+               if(best_psa != NULL && best_score>0){
                        best_psa->addSubscription(best_psa->admin,sub);
                }
 
                // Inform discoveries for interest in the topic
-        celixThreadMutex_lock(&manager->discoveryListLock);
+               celixThreadMutex_lock(&manager->discoveryListLock);
                hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-        while(hashMapIterator_hasNext(iter)){
-            service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-            publisher_endpoint_announce_pt disc = NULL;
-            bundleContext_getService(manager->context, disc_sr, (void**) 
&disc);
-            disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
-            bundleContext_ungetService(manager->context, disc_sr, NULL);
-        }
-        hashMapIterator_destroy(iter);
-        celixThreadMutex_unlock(&manager->discoveryListLock);
+               while(hashMapIterator_hasNext(iter)){
+                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
+                       publisher_endpoint_announce_pt disc = NULL;
+                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
+                       disc->interestedInTopic(disc->handle, sub->scope, 
sub->topic);
+                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
+               }
+               hashMapIterator_destroy(iter);
+               celixThreadMutex_unlock(&manager->discoveryListLock);
 
                celixThreadMutex_unlock(&manager->psaListLock);
        }
        else{
-               status = CELIX_INVALID_BUNDLE_CONTEXT;
+               status=CELIX_INVALID_BUNDLE_CONTEXT;
        }
 
        return status;
@@ -482,25 +330,25 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
        pubsub_topology_manager_pt manager = handle;
 
        pubsub_endpoint_pt subcmp = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&subcmp) == 
CELIX_SUCCESS){
+       if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) 
== CELIX_SUCCESS){
 
                int j,k;
 
                // Inform discoveries that we not interested in the topic any 
more
-        celixThreadMutex_lock(&manager->discoveryListLock);
-        hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-        while(hashMapIterator_hasNext(iter)){
-            service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-            publisher_endpoint_announce_pt disc = NULL;
-            bundleContext_getService(manager->context, disc_sr, (void**) 
&disc);
-            disc->uninterestedInTopic(disc->handle, subcmp->scope, 
subcmp->topic);
-            bundleContext_ungetService(manager->context, disc_sr, NULL);
-        }
-        hashMapIterator_destroy(iter);
-        celixThreadMutex_unlock(&manager->discoveryListLock);
-
-        celixThreadMutex_lock(&manager->subscriptionsLock);
-        celixThreadMutex_lock(&manager->psaListLock);
+               celixThreadMutex_lock(&manager->discoveryListLock);
+               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
+               while(hashMapIterator_hasNext(iter)){
+                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
+                       publisher_endpoint_announce_pt disc = NULL;
+                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
+                       disc->uninterestedInTopic(disc->handle, subcmp->scope, 
subcmp->topic);
+                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
+               }
+               hashMapIterator_destroy(iter);
+               celixThreadMutex_unlock(&manager->discoveryListLock);
+
+               celixThreadMutex_lock(&manager->subscriptionsLock);
+               celixThreadMutex_lock(&manager->psaListLock);
 
                char *sub_key = 
createScopeTopicKey(subcmp->scope,subcmp->topic);
                array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
@@ -509,19 +357,10 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
                        for(j=0;j<arrayList_size(sub_list_by_topic);j++){
                                pubsub_endpoint_pt sub = 
arrayList_get(sub_list_by_topic,j);
                                if(pubsubEndpoint_equals(sub,subcmp)){
-                                       double highest_score = -1;
-                                       pubsub_admin_service_pt best_psa = NULL;
                                        
for(k=0;k<arrayList_size(manager->psaList);k++){
+                                               /* No problem with invoking 
removal on all psa's, only the one that manage this topic will do something */
                                                pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               double score;
-                                               
psa->matchSubscriber(psa->admin, sub, &score);
-                                               if (score > highest_score){
-                                                       highest_score = score;
-                                                       best_psa = psa;
-                                               }
-                                       }
-                                       if (best_psa != NULL){
-                                               
best_psa->removeSubscription(best_psa->admin,sub);
+                                               
psa->removeSubscription(psa->admin,sub);
                                        }
 
                                }
@@ -547,22 +386,13 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 
        }
        else{
-               status = CELIX_INVALID_BUNDLE_CONTEXT;
+               status=CELIX_INVALID_BUNDLE_CONTEXT;
        }
 
        return status;
 
 }
 
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, 
service_reference_pt reference, void** service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       bundleContext_getService(manager->context, reference, service);
-
-       return status;
-}
-
 celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, 
service_reference_pt reference, void* service) {
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
@@ -600,16 +430,16 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
        iter = hashMapIterator_create(manager->subscriptions);
 
        while(hashMapIterator_hasNext(iter)) {
-           array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
-           int i;
-           for(i=0;i<arrayList_size(l);i++){
-               pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
+               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(iter);
+               int i;
+               for(i=0;i<arrayList_size(l);i++){
+                       pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 
-               disc->interestedInTopic(disc->handle, subEp->scope, 
subEp->topic);
-           }
+                       disc->interestedInTopic(disc->handle, subEp->scope, 
subEp->topic);
+               }
        }
        hashMapIterator_destroy(iter);
-    celixThreadMutex_unlock(&manager->subscriptionsLock);
+       celixThreadMutex_unlock(&manager->subscriptionsLock);
 
        return status;
 }
@@ -654,81 +484,57 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 
                listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
-               const char* fwUUID=NULL;
-               
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+               pubsub_endpoint_pt pub = NULL;
+               if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) 
== CELIX_SUCCESS){
 
-               char* scope = pubsub_getScopeFromFilter(info->filter);
-               char* topic = pubsub_getTopicFromFilter(info->filter);
-               if(scope == NULL) {
-                       scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
-               }
-
-               //TODO: Can we use a better serviceID??
-               bundle_pt bundle = NULL;
-               long bundleId = -1;
-               bundleContext_getBundle(info->context,&bundle);
-               bundle_getBundleId(bundle,&bundleId);
+                       celixThreadMutex_lock(&manager->publicationsLock);
+                       char *pub_key = createScopeTopicKey(pub->scope, 
pub->topic);
+                       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
+                       if(pub_list_by_topic==NULL){
+                               arrayList_create(&pub_list_by_topic);
+                               
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+                       }
+                       free(pub_key);
+                       arrayList_add(pub_list_by_topic,pub);
 
-               if(fwUUID !=NULL && topic !=NULL){
+                       celixThreadMutex_unlock(&manager->publicationsLock);
 
-                       pubsub_endpoint_pt pub = NULL;
-                       if(pubsubEndpoint_create(fwUUID, scope, 
topic,bundleId,NULL,&pub) == CELIX_SUCCESS){
+                       int j;
+                       double score = 0;
+                       double best_score = 0;
+                       pubsub_admin_service_pt best_psa = NULL;
+                       celixThreadMutex_lock(&manager->psaListLock);
 
-                               
celixThreadMutex_lock(&manager->publicationsLock);
-                               char *pub_key = createScopeTopicKey(scope, 
topic);
-                               array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
-                               if(pub_list_by_topic==NULL){
-                                       arrayList_create(&pub_list_by_topic);
-                                       
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+                       for(j=0;j<arrayList_size(manager->psaList);j++){
+                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+                               psa->matchEndpoint(psa->admin,pub,&score);
+                               if(score>best_score){ /* We have a new winner! 
*/
+                                       best_score = score;
+                                       best_psa = psa;
                                }
-                               free(pub_key);
-                               arrayList_add(pub_list_by_topic,pub);
-
-                               
celixThreadMutex_unlock(&manager->publicationsLock);
-
-                               int j;
-                               celixThreadMutex_lock(&manager->psaListLock);
-
-                               double highest_score = -1;
-                               pubsub_admin_service_pt best_psa = NULL;
+                       }
 
-                               for(j=0;j<arrayList_size(manager->psaList);j++){
-                                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                                       double score;
-                                       psa->matchPublisher(psa->admin, pub, 
&score);
-                                       if (score > highest_score){
-                                               highest_score = score;
-                                               best_psa = psa;
+                       if(best_psa != NULL && best_score>0){
+                               status = 
best_psa->addPublication(best_psa->admin,pub);
+                               if(status==CELIX_SUCCESS){
+                                       
celixThreadMutex_lock(&manager->discoveryListLock);
+                                       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
+                                       while(hashMapIterator_hasNext(iter)){
+                                               service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
+                                               publisher_endpoint_announce_pt 
disc = NULL;
+                                               
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+                                               
disc->announcePublisher(disc->handle,pub);
+                                               
bundleContext_ungetService(manager->context, disc_sr, NULL);
                                        }
+                                       hashMapIterator_destroy(iter);
+                                       
celixThreadMutex_unlock(&manager->discoveryListLock);
                                }
-                               if (best_psa != NULL){
-                                       status = 
best_psa->addPublication(best_psa->admin,pub);
-                                       if(status==CELIX_SUCCESS){
-                                               
celixThreadMutex_lock(&manager->discoveryListLock);
-                                               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-                                               
while(hashMapIterator_hasNext(iter)){
-                                                       service_reference_pt 
disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-                                                       
publisher_endpoint_announce_pt disc = NULL;
-                                                       
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-                                                       
disc->announcePublisher(disc->handle,pub);
-                                                       
bundleContext_ungetService(manager->context, disc_sr, NULL);
-                                               }
-                                               hashMapIterator_destroy(iter);
-                                               
celixThreadMutex_unlock(&manager->discoveryListLock);
-                                       }
-                               }
-
-                               celixThreadMutex_unlock(&manager->psaListLock);
-
                        }
 
-               }
-               else{
-                       status=CELIX_INVALID_BUNDLE_CONTEXT;
+                       celixThreadMutex_unlock(&manager->psaListLock);
+
                }
 
-               free(topic);
-        free(scope);
        }
 
        return status;
@@ -746,62 +552,41 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 
                listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
-               char* pub_scope = pubsub_getScopeFromFilter(info->filter);
-               char* pub_topic = pubsub_getTopicFromFilter(info->filter);
-
-               const char* fwUUID=NULL;
-               
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-
-               //TODO: Can we use a better serviceID??
-               bundle_pt bundle = NULL;
-               long bundleId = -1;
-               bundleContext_getBundle(info->context,&bundle);
-               bundle_getBundleId(bundle,&bundleId);
-
-               if(bundle !=NULL && pub_topic !=NULL && bundleId>0){
-
-                       pubsub_endpoint_pt pubcmp = NULL;
-                       if(pubsubEndpoint_create(fwUUID, pub_scope, 
pub_topic,bundleId,NULL,&pubcmp) == CELIX_SUCCESS){
-
-                               int j,k;
-                celixThreadMutex_lock(&manager->psaListLock);
-                celixThreadMutex_lock(&manager->publicationsLock);
-
-                char *pub_key = createScopeTopicKey(pub_scope, pub_topic);
-                               array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-                               if(pub_list_by_topic!=NULL){
-                                       
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-                                               pubsub_endpoint_pt pub = 
arrayList_get(pub_list_by_topic,j);
-                                               
if(pubsubEndpoint_equals(pub,pubcmp)){
-                                                       double highest_score = 
-1;
-                                                       pubsub_admin_service_pt 
best_psa = NULL;
-
-                                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                                               
pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                               double score;
-                                                               
psa->matchPublisher(psa->admin, pub, &score);
-                                                               if (score > 
highest_score){
-                                                                       
highest_score = score;
-                                                                       
best_psa = psa;
+               pubsub_endpoint_pt pubcmp = NULL;
+               if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) 
== CELIX_SUCCESS){
+
+
+                       int j,k;
+                       celixThreadMutex_lock(&manager->psaListLock);
+                       celixThreadMutex_lock(&manager->publicationsLock);
+
+                       char *pub_key = createScopeTopicKey(pubcmp->scope, 
pubcmp->topic);
+                       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
+                       if(pub_list_by_topic!=NULL){
+                               
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
+                                       pubsub_endpoint_pt pub = 
arrayList_get(pub_list_by_topic,j);
+                                       if(pubsubEndpoint_equals(pub,pubcmp)){
+                                               
for(k=0;k<arrayList_size(manager->psaList);k++){
+                                                       pubsub_admin_service_pt 
psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+                                                       status = 
psa->removePublication(psa->admin,pub);
+                                                       
if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
+                                                               
celixThreadMutex_lock(&manager->discoveryListLock);
+                                                               
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+                                                               
while(hashMapIterator_hasNext(iter)){
+                                                                       
service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
+                                                                       
publisher_endpoint_announce_pt disc = NULL;
+                                                                       
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+                                                                       
disc->removePublisher(disc->handle,pub);
+                                                                       
bundleContext_ungetService(manager->context, disc_sr, NULL);
                                                                }
+                                                               
hashMapIterator_destroy(iter);
+                                                               
celixThreadMutex_unlock(&manager->discoveryListLock);
                                                        }
-                                                       if (best_psa != NULL){
-                                                               status = 
best_psa->removePublication(best_psa->admin,pub);
-                                                               
if(status==CELIX_SUCCESS){
-                                                                       
celixThreadMutex_lock(&manager->discoveryListLock);
-                                                                       
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-                                                                       
while(hashMapIterator_hasNext(iter)){
-                                                                               
service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                                                                               
publisher_endpoint_announce_pt disc = NULL;
-                                                                               
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-                                                                               
disc->removePublisher(disc->handle,pub);
-                                                                               
bundleContext_ungetService(manager->context, disc_sr, NULL);
-                                                                       }
-                                                                       
hashMapIterator_destroy(iter);
-                                                                       
celixThreadMutex_unlock(&manager->discoveryListLock);
-                                                               }
+                                                       else if(status ==  
CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not 
handle this endpoint */
+                                                               status = 
CELIX_SUCCESS;
                                                        }
                                                }
+                                               //}
                                                
arrayList_remove(pub_list_by_topic,j);
 
                                                /* If it was the last publisher 
for this topic, tell PSA to close the ZMQ socket and then inform the discovery 
*/
@@ -813,26 +598,20 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                                                }
 
                                                pubsubEndpoint_destroy(pub);
-
                                        }
-                               }
 
-                               
celixThreadMutex_unlock(&manager->publicationsLock);
-                               celixThreadMutex_unlock(&manager->psaListLock);
+                               }
+                       }
 
-                               pubsubEndpoint_destroy(pubcmp);
+                       celixThreadMutex_unlock(&manager->publicationsLock);
+                       celixThreadMutex_unlock(&manager->psaListLock);
 
-                               free(pub_key);
+                       free(pub_key);
 
-                       }
+                       pubsubEndpoint_destroy(pubcmp);
 
                }
-               else{
-                       status=CELIX_INVALID_BUNDLE_CONTEXT;
-               }
 
-               free(pub_scope);
-               free(pub_topic);
        }
 
        return status;
@@ -846,8 +625,6 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
 
-       int i;
-
        char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
 
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
@@ -859,23 +636,28 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 
        /* Shouldn't be any other duplicate, since it's filtered out by the 
discovery */
        pubsub_endpoint_pt p = NULL;
-       
pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+       pubsubEndpoint_clone(pubEP, &p);
        arrayList_add(pub_list_by_topic,p);
 
-       double highest_score = -1;
+       int j;
+       double score = 0;
+       double best_score = 0;
        pubsub_admin_service_pt best_psa = NULL;
 
-       for(i=0;i<arrayList_size(manager->psaList);i++){
-               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-               double score;
-               psa->matchPublisher(psa->admin, p, &score);
-               if (score > highest_score){
-                       highest_score = score;
+       for(j=0;j<arrayList_size(manager->psaList);j++){
+               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+               psa->matchEndpoint(psa->admin,p,&score);
+               if(score>best_score){ /* We have a new winner! */
+                       best_score = score;
                        best_psa = psa;
                }
        }
-       if (best_psa != NULL){
-               status += best_psa->addPublication(best_psa->admin,p);
+
+       if(best_psa != NULL && best_score>0){
+               best_psa->addPublication(best_psa->admin,p);
+       }
+       else{
+               status = CELIX_ILLEGAL_STATE;
        }
 
        celixThreadMutex_unlock(&manager->publicationsLock);
@@ -911,20 +693,10 @@ celix_status_t 
pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
 
                if(found && p !=NULL){
 
-                       double highest_score = -1;
-                       pubsub_admin_service_pt best_psa = NULL;
-
                        for(i=0;i<arrayList_size(manager->psaList);i++){
                                pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                               double score;
-                               psa->matchPublisher(psa->admin, p, &score);
-                               if (score > highest_score){
-                                       highest_score = score;
-                                       best_psa = psa;
-                               }
-                       }
-                       if (best_psa != NULL){
-                               status += 
best_psa->removePublication(best_psa->admin,p);
+                               /* No problem with invoking removal on all 
psa's, only the one that manage this topic will do something */
+                               psa->removePublication(psa->admin,p);
                        }
 
                        arrayList_removeElement(pub_list_by_topic,p);
@@ -934,7 +706,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void 
*handle, pubsub_endpo
 
                                for(i=0;i<arrayList_size(manager->psaList);i++){
                                        pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       status += 
psa->closeAllPublications(psa->admin,p->scope, p->topic);
+                                       
psa->closeAllPublications(psa->admin,p->scope, p->topic);
                                }
                        }
 

Reply via email to