Repository: celix Updated Branches: refs/heads/feature/CELIX-454-pubsub-disc [created] b2548c845
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index bfd038b..aff03c7 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -40,66 +40,66 @@ #define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS 30L -static void* pstm_psaHandlingThread(void *data); +static void *pstm_psaHandlingThread(void *data); celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager)); - if (manager == NULL) { - *out = NULL; - return CELIX_ENOMEM; - } else { - *out = manager; - } - - manager->context = context; - - celix_thread_mutexattr_t psaAttr; - celixThreadMutexAttr_create(&psaAttr); - celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); - status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr); - celixThreadMutexAttr_destroy(&psaAttr); - - status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL); - status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL); - status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL); - status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL); - status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL); - - status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL); - - manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - manager->announceEndpointListeners.list = celix_arrayList_create(); - manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL); - manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - - manager->loghelper = logHelper; - manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE); - - manager->psaHandling.running = true; - celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager); - celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager"); - - return status; + celix_status_t status = CELIX_SUCCESS; + + pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager)); + if (manager == NULL) { + *out = NULL; + return CELIX_ENOMEM; + } else { + *out = manager; + } + + manager->context = context; + + celix_thread_mutexattr_t psaAttr; + celixThreadMutexAttr_create(&psaAttr); + celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); + status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr); + celixThreadMutexAttr_destroy(&psaAttr); + + status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL); + status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL); + status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL); + status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL); + status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL); + + status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL); + + manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + manager->announceEndpointListeners.list = celix_arrayList_create(); + manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL); + manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + manager->loghelper = logHelper; + manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE); + + manager->psaHandling.running = true; + celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager); + celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager"); + + return status; } celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&manager->psaHandling.mutex); - manager->psaHandling.running = false; - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); - celixThread_join(manager->psaHandling.thread, NULL); + celixThreadMutex_lock(&manager->psaHandling.mutex); + manager->psaHandling.running = false; + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + celixThread_join(manager->psaHandling.thread, NULL); - celixThreadMutex_lock(&manager->pubsubadmins.mutex); - hashMap_destroy(manager->pubsubadmins.map, false, false); - celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - celixThreadMutex_destroy(&manager->pubsubadmins.mutex); + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hashMap_destroy(manager->pubsubadmins.map, false, false); + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + celixThreadMutex_destroy(&manager->pubsubadmins.mutex); - celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter); @@ -108,9 +108,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager free(entry); } } - hashMap_destroy(manager->discoveredEndpoints.map, false, false); - celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); - celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex); + hashMap_destroy(manager->discoveredEndpoints.map, false, false); + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex); celixThreadMutex_lock(&manager->topicReceivers.mutex); iter = hashMapIterator_construct(manager->topicReceivers.map); @@ -120,6 +120,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager free(entry->scopeAndTopicKey); free(entry->scope); free(entry->topic); + if (entry->topicProperties != NULL) { + celix_properties_destroy(entry->topicProperties); + } if (entry->endpoint != NULL) { celix_properties_destroy(entry->endpoint); } @@ -139,6 +142,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager free(entry->scopeAndTopicKey); free(entry->scope); free(entry->topic); + if (entry->topicProperties != NULL) { + celix_properties_destroy(entry->topicProperties); + } if (entry->endpoint != NULL) { celix_properties_destroy(entry->endpoint); } @@ -151,70 +157,92 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager celixThreadMutex_destroy(&manager->topicSenders.mutex); celixThreadMutex_destroy(&manager->announceEndpointListeners.mutex); - celix_arrayList_destroy(manager->announceEndpointListeners.list); + celix_arrayList_destroy(manager->announceEndpointListeners.list); - free(manager); + free(manager); - return status; + return status; } -void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { - pubsub_topology_manager_t *manager = handle; - pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc; +void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = handle; + pubsub_admin_service_t *psa = (pubsub_admin_service_t *) svc; + + + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA"); + if (svcId >= 0) { + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hashMap_put(manager->pubsubadmins.map, (void *) svcId, psa); + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + } + + //NOTE new psa, so no endpoints announce yet - long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA"); + //new PSA -> every topic sender/receiver entry needs a rematch + int needsRematchCount = 0; - if (svcId >= 0) { - celixThreadMutex_lock(&manager->pubsubadmins.mutex); - hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa); - celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - } + celixThreadMutex_lock(&manager->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + entry->needsMatch = true; + ++needsRematchCount; + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + celixThreadMutex_lock(&manager->topicReceivers.mutex); + iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + entry->needsMatch = true; + ++needsRematchCount; + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); - //NOTE new psa, so no endpoints announce yet + if (needsRematchCount > 0) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, + "A PSA is added after at least one active publisher/provided. \ + It is preferred that all PSA are started before publiser/subscriber are started!\n\ + Current topic/sender count is %i", needsRematchCount); + } - /* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher - * so no retroactively adding subscribers - * - * TODO future extension? - */ } -void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) { - pubsub_topology_manager_t *manager = handle; - //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc; - long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); +void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) { + pubsub_topology_manager_t *manager = handle; + //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc; + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); - //NOTE psa shutdown will teardown topic receivers / topic senders - //de-setup all topic receivers/senders for the removed psa. - //the next psaHandling run will try to find new psa. + //NOTE psa shutdown will teardown topic receivers / topic senders + //de-setup all topic receivers/senders for the removed psa. + //the next psaHandling run will try to find new psa. celixThreadMutex_lock(&manager->topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry->selectedPsaSvcId == svcId) { - /* de-announce all senders */ - if (entry->endpoint != NULL) { - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) { - pubsub_announce_endpoint_listener_t *listener; - listener = celix_arrayList_get(manager->announceEndpointListeners.list, j); - listener->removeEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - } - - entry->needsMatch = true; - entry->selectedSerializerSvcId = -1L; - entry->selectedPsaSvcId = -1L; - if (entry->endpoint != NULL) { - celix_properties_destroy(entry->endpoint); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->selectedPsaSvcId == svcId) { + /* de-announce all senders */ + if (entry->endpoint != NULL) { + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) { + pubsub_announce_endpoint_listener_t *listener; + listener = celix_arrayList_get(manager->announceEndpointListeners.list, j); + listener->revokeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + } + + entry->needsMatch = true; + entry->selectedSerializerSvcId = -1L; + entry->selectedPsaSvcId = -1L; + if (entry->endpoint != NULL) { + celix_properties_destroy(entry->endpoint); entry->endpoint = NULL; } - } - } + } + } celixThreadMutex_unlock(&manager->topicSenders.mutex); celixThreadMutex_lock(&manager->topicReceivers.mutex); @@ -222,16 +250,16 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u while (hashMapIterator_hasNext(&iter)) { pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry->selectedPsaSvcId == svcId) { - /* de-announce all receivers */ - if (entry->endpoint != NULL) { - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) { - pubsub_announce_endpoint_listener_t *listener; - listener = celix_arrayList_get(manager->announceEndpointListeners.list, j); - listener->removeEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - } + /* de-announce all receivers */ + if (entry->endpoint != NULL) { + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) { + pubsub_announce_endpoint_listener_t *listener; + listener = celix_arrayList_get(manager->announceEndpointListeners.list, j); + listener->revokeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + } entry->needsMatch = true; entry->selectedSerializerSvcId = -1L; @@ -245,207 +273,206 @@ void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((u celixThreadMutex_unlock(&manager->topicReceivers.mutex); - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA"); } -void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_topology_manager_t *manager = handle; - - //NOTE new local subscriber service register - //1) First trying to see if a TopicReceiver already exists for this subscriber, if found - //2) update the usage count. if not found - //3) signal psaHandling thread to setup topic receiver - - const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (topic == NULL) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, - "[PSTM] Warning found subscriber service without mandatory %s property.", - PUBSUB_SUBSCRIBER_TOPIC); - return; - } - - long bndId = celix_bundle_getId(bnd); - char *scopeAndTopicKey = NULL; - scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); - - celixThreadMutex_lock(&manager->topicReceivers.mutex); - pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey); - if (entry != NULL) { - entry->usageCount += 1; - free(scopeAndTopicKey); - } else { - entry = calloc(1, sizeof(*entry)); - entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship - entry->scope = strndup(scope, 1024 * 1024); - entry->topic = strndup(topic, 1024 * 1024); - entry->usageCount = 1; - entry->selectedPsaSvcId = -1L; - entry->selectedSerializerSvcId = -1L; - entry->needsMatch = true; - entry->bndId = bndId; - entry->subscriberProperties = celix_properties_copy(props); - hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry); - - //signal psa handling thread - celixThreadCondition_broadcast(&manager->psaHandling.cond); - } - celixThreadMutex_unlock(&manager->topicReceivers.mutex); +void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_topology_manager_t *manager = handle; + + //NOTE new local subscriber service register + //1) First trying to see if a TopicReceiver already exists for this subscriber, if found + //2) update the usage count. if not found + //3) signal psaHandling thread to setup topic receiver + + const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (topic == NULL) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, + "[PSTM] Warning found subscriber service without mandatory %s property.", + PUBSUB_SUBSCRIBER_TOPIC); + return; + } + + long bndId = celix_bundle_getId(bnd); + char *scopeAndTopicKey = NULL; + scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + + celixThreadMutex_lock(&manager->topicReceivers.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount += 1; + free(scopeAndTopicKey); + } else { + entry = calloc(1, sizeof(*entry)); + entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship + entry->scope = strndup(scope, 1024 * 1024); + entry->topic = strndup(topic, 1024 * 1024); + entry->usageCount = 1; + entry->selectedPsaSvcId = -1L; + entry->selectedSerializerSvcId = -1L; + entry->needsMatch = true; + entry->bndId = bndId; + entry->subscriberProperties = celix_properties_copy(props); + hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry); + + //signal psa handling thread + celixThreadCondition_broadcast(&manager->psaHandling.cond); + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); } -void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_topology_manager_t *manager = handle; +void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_topology_manager_t *manager = handle; - //NOTE local subscriber service unregister - //1) Find topic receiver and decrease count + //NOTE local subscriber service unregister + //1) Find topic receiver and decrease count - const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (topic == NULL) { - return; - } + if (topic == NULL) { + return; + } - char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&manager->topicReceivers.mutex); - pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey); - if (entry != NULL) { - entry->usageCount -= 0; - } - celixThreadMutex_unlock(&manager->topicReceivers.mutex); - free(scopeAndTopicKey); + char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicReceivers.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount -= 0; + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + free(scopeAndTopicKey); - //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately. + //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately. } -void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) { - pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle; - pubsub_announce_endpoint_listener_t *listener = svc; - - //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints) - //2) Add listener to manager->announceEndpointListeners - - celixThreadMutex_lock(&manager->topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL && entry->endpoint != NULL) { - listener->announceEndpoint(listener->handle, entry->endpoint); - } - } - celixThreadMutex_unlock(&manager->topicSenders.mutex); - - celixThreadMutex_lock(&manager->topicReceivers.mutex); - iter = hashMapIterator_construct(manager->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL && entry->endpoint != NULL) { - listener->announceEndpoint(listener->handle, entry->endpoint); - } - } - celixThreadMutex_unlock(&manager->topicReceivers.mutex); - - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - celix_arrayList_add(manager->announceEndpointListeners.list, listener); - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); +void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle; + pubsub_announce_endpoint_listener_t *listener = svc; + + //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints) + //2) Add listener to manager->announceEndpointListeners + + celixThreadMutex_lock(&manager->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->endpoint != NULL) { + listener->announceEndpoint(listener->handle, entry->endpoint); + } + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + + celixThreadMutex_lock(&manager->topicReceivers.mutex); + iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->endpoint != NULL) { + listener->announceEndpoint(listener->handle, entry->endpoint); + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + celix_arrayList_add(manager->announceEndpointListeners.list, listener); + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); } -void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { - pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle; - pubsub_announce_endpoint_listener_t *listener = svc; +void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle; + pubsub_announce_endpoint_listener_t *listener = svc; - //1) Remove listener from manager->announceEndpointListeners - //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints) + //1) Remove listener from manager->announceEndpointListeners + //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints) - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - celix_arrayList_remove(manager->announceEndpointListeners.list, listener); - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + celix_arrayList_remove(manager->announceEndpointListeners.list, listener); + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); } void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) { - pubsub_topology_manager_t *manager = handle; - - //NOTE new local subscriber service register - //1) First trying to see if a TopicReceiver already exists for this subscriber, if found - //2) update the usage count. if not found - //3) signal psaHandling thread to find a psa and setup TopicSender - - - char *topicFromFilter = NULL; - char *scopeFromFilter = NULL; - pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter); - char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter; - char *topic = topicFromFilter; - - char *scopeAndTopicKey = NULL; - if (topic == NULL) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, - "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", - PUBSUB_SUBSCRIBER_TOPIC); - return; - } - - scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&manager->topicSenders.mutex); - pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); - if (entry != NULL) { - entry->usageCount += 1; - free(scope); - free(topic); - free(scopeAndTopicKey); - } else { - entry = calloc(1, sizeof(*entry)); - entry->usageCount = 1; - entry->selectedSerializerSvcId = -1L; - entry->selectedPsaSvcId = -1L; - entry->scope = scope; //taking ownership - entry->topic = topic; //taking ownership - entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership - entry->needsMatch = true; - entry->publisherFilter = celix_filter_create(info->filter->filterStr); - entry->bndId = info->bundleId; - hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry); - - //new entry -> wakeup psaHandling thread - celixThreadCondition_broadcast(&manager->psaHandling.cond); - } - celixThreadMutex_unlock(&manager->topicSenders.mutex); + pubsub_topology_manager_t *manager = handle; + + //NOTE new local subscriber service register + //1) First trying to see if a TopicReceiver already exists for this subscriber, if found + //2) update the usage count. if not found + //3) signal psaHandling thread to find a psa and setup TopicSender + + + char *topicFromFilter = NULL; + char *scopeFromFilter = NULL; + pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter); + char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter; + char *topic = topicFromFilter; + + char *scopeAndTopicKey = NULL; + if (topic == NULL) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, + "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", + PUBSUB_SUBSCRIBER_TOPIC); + return; + } + + scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicSenders.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount += 1; + free(scope); + free(topic); + free(scopeAndTopicKey); + } else { + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->selectedSerializerSvcId = -1L; + entry->selectedPsaSvcId = -1L; + entry->scope = scope; //taking ownership + entry->topic = topic; //taking ownership + entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership + entry->needsMatch = true; + entry->publisherFilter = celix_filter_create(info->filter->filterStr); + entry->bndId = info->bundleId; + hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry); + + //new entry -> wakeup psaHandling thread + celixThreadCondition_broadcast(&manager->psaHandling.cond); + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); } void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) { - pubsub_topology_manager_t *manager = handle; + pubsub_topology_manager_t *manager = handle; - //NOTE local subscriber service unregister - //1) Find topic sender and decrease count + //NOTE local subscriber service unregister + //1) Find topic sender and decrease count - char *topic = NULL; - char *scopeFromFilter = NULL; - pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter); - const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; + char *topic = NULL; + char *scopeFromFilter = NULL; + pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter); + const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; - if (topic == NULL) { - free(scopeFromFilter); - return; - } + if (topic == NULL) { + free(scopeFromFilter); + return; + } - char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&manager->topicSenders.mutex); - pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); - if (entry != NULL) { - entry->usageCount -= 1; - } - celixThreadMutex_unlock(&manager->topicSenders.mutex); + char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicSenders.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount -= 1; + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); - free(scopeAndTopicKey); - free(topic); - free(scopeFromFilter); + free(scopeAndTopicKey); + free(topic); + free(scopeFromFilter); } -celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){ - celix_status_t status = CELIX_SUCCESS; +celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { + celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_t *manager = handle; const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); @@ -455,92 +482,92 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const // 1) If not, find matching psa using the matchEndpoint // 2) if found call addEndpoint of the matching psa - if (manager->verbose) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, - "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), - uuid); - } - - - celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); - pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); - if (entry != NULL) { - //already existing endpoint -> increase usage - entry->usageCount += 1; - } else { - //new endpoint -> new entry - entry = calloc(1, sizeof(*entry)); - entry->usageCount = 1; - entry->endpoint = celix_properties_copy(endpoint); - entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL); - entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet - hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry); - - //waking up psa handling thread to select psa - celixThreadCondition_broadcast(&manager->psaHandling.cond); - - } - celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + if (manager->verbose) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), + uuid); + } + + + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); + if (entry != NULL) { + //already existing endpoint -> increase usage + entry->usageCount += 1; + } else { + //new endpoint -> new entry + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->endpoint = celix_properties_copy(endpoint); + entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL); + entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet + hashMap_put(manager->discoveredEndpoints.map, (void *) entry->uuid, entry); + + //waking up psa handling thread to select psa + celixThreadCondition_broadcast(&manager->psaHandling.cond); + + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); return status; } static void pstm_removeEndpointCallback(void *handle, void *svc) { - celix_properties_t *endpoint = handle; - pubsub_admin_service_t *psa = svc; - psa->removeEndpoint(psa->handle, endpoint); + celix_properties_t *endpoint = handle; + pubsub_admin_service_t *psa = svc; + psa->removeDiscoveredEndpoint(psa->handle, endpoint); } celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { pubsub_topology_manager_t *manager = handle; - // 1) See if endpoint is already discovered, if so decrease usage count. - // 1) If usage count becomes 0, find matching psa using the matchEndpoint - // 2) if found call disconnectEndpoint of the matching psa - - const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); - assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid. - - if (manager->verbose) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, - "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), - uuid); - } - - celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); - pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); - if (entry != NULL) { - //already existing endpoint -> decrease usage - entry->usageCount-= 1; - if (entry->usageCount <= 0) { - hashMap_remove(manager->discoveredEndpoints.map, entry->uuid); - } else { - entry = NULL; //still used (usage count > 0) -> do nothing - } - } - celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); - - if (entry != NULL) { - //note entry is removed from manager->discoveredEndpoints, also inform used psa - if (entry->selectedPsaSvcId >= 0) { - //note that it is possible that the psa is already gone, in that case the call is also not needed anymore. - celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, - (void *) endpoint, pstm_removeEndpointCallback); - } else { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid); - } - celix_properties_destroy(entry->endpoint); - free(entry); - } - - - return CELIX_SUCCESS; + // 1) See if endpoint is already discovered, if so decrease usage count. + // 1) If usage count becomes 0, find matching psa using the matchEndpoint + // 2) if found call disconnectEndpoint of the matching psa + + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); + assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid. + + if (manager->verbose) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), + uuid); + } + + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); + if (entry != NULL) { + //already existing endpoint -> decrease usage + entry->usageCount -= 1; + if (entry->usageCount <= 0) { + hashMap_remove(manager->discoveredEndpoints.map, entry->uuid); + } else { + entry = NULL; //still used (usage count > 0) -> do nothing + } + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + + if (entry != NULL) { + //note entry is removed from manager->discoveredEndpoints, also inform used psa + if (entry->selectedPsaSvcId >= 0) { + //note that it is possible that the psa is already gone, in that case the call is also not needed anymore. + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + (void *) endpoint, pstm_removeEndpointCallback); + } else { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid); + } + celix_properties_destroy(entry->endpoint); + free(entry); + } + + + return CELIX_SUCCESS; } @@ -559,45 +586,48 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) { if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) { if (manager->verbose && entry->endpoint != NULL) { logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, - "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic); + "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic); } if (entry->endpoint != NULL) { - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { - pubsub_announce_endpoint_listener_t *listener; - listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); - listener->removeEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, - PUBSUB_ADMIN_SERVICE_NAME, - entry, pstm_teardownTopicSenderCallback); - } + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener; + listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->revokeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, + PUBSUB_ADMIN_SERVICE_NAME, + entry, pstm_teardownTopicSenderCallback); + } //cleanup entry if (entry->usageCount <= 0) { - //no usage -> remove - hashMapIterator_remove(&iter); - free(entry->scopeAndTopicKey); - free(entry->scope); - free(entry->topic); - if (entry->publisherFilter != NULL) { - celix_filter_destroy(entry->publisherFilter); - } - if (entry->endpoint != NULL) { - celix_properties_destroy(entry->endpoint); - } - free(entry); - } else { - //still usage, setup for new match - if (entry->endpoint != NULL) { - celix_properties_destroy(entry->endpoint); - } - entry->endpoint = NULL; - entry->selectedPsaSvcId = -1L; - entry->selectedSerializerSvcId = -1L; + //no usage -> remove + hashMapIterator_remove(&iter); + free(entry->scopeAndTopicKey); + free(entry->scope); + free(entry->topic); + if (entry->topicProperties != NULL) { + celix_properties_destroy(entry->topicProperties); + } + if (entry->publisherFilter != NULL) { + celix_filter_destroy(entry->publisherFilter); + } + if (entry->endpoint != NULL) { + celix_properties_destroy(entry->endpoint); + } + free(entry); + } else { + //still usage, setup for new match + if (entry->endpoint != NULL) { + celix_properties_destroy(entry->endpoint); + } + entry->endpoint = NULL; + entry->selectedPsaSvcId = -1L; + entry->selectedSerializerSvcId = -1L; } } } @@ -625,215 +655,235 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) { } if (entry->endpoint != NULL) { - celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, - PUBSUB_ADMIN_SERVICE_NAME, entry, - pstm_teardownTopicReceiverCallback); - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { - pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get( - manager->announceEndpointListeners.list, i); - listener->removeEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - } - - if (entry->usageCount <= 0) { - //no usage -> remove - hashMapIterator_remove(&iter); - //cleanup entry - free(entry->scopeAndTopicKey); - free(entry->scope); - free(entry->topic); - if (entry->subscriberProperties != NULL) { - celix_properties_destroy(entry->subscriberProperties); - } - if (entry->endpoint != NULL) { - celix_properties_destroy(entry->endpoint); - } - free(entry); - } else { - //still usage -> setup for rematch - if (entry->endpoint != NULL) { - celix_properties_destroy(entry->endpoint); - } - entry->endpoint = NULL; - entry->selectedPsaSvcId = -1L; - entry->selectedSerializerSvcId = -1L; - } + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, + PUBSUB_ADMIN_SERVICE_NAME, entry, + pstm_teardownTopicReceiverCallback); + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get( + manager->announceEndpointListeners.list, i); + listener->revokeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + } + + if (entry->usageCount <= 0) { + //no usage -> remove + hashMapIterator_remove(&iter); + //cleanup entry + free(entry->scopeAndTopicKey); + free(entry->scope); + free(entry->topic); + if (entry->topicProperties != NULL) { + celix_properties_destroy(entry->topicProperties); + } + if (entry->subscriberProperties != NULL) { + celix_properties_destroy(entry->subscriberProperties); + } + if (entry->endpoint != NULL) { + celix_properties_destroy(entry->endpoint); + } + free(entry); + } else { + //still usage -> setup for rematch + if (entry->endpoint != NULL) { + celix_properties_destroy(entry->endpoint); + } + entry->endpoint = NULL; + entry->selectedPsaSvcId = -1L; + entry->selectedSerializerSvcId = -1L; + } } } celixThreadMutex_unlock(&manager->topicReceivers.mutex); } static void pstm_addEndpointCallback(void *handle, void *svc) { - celix_properties_t *endpoint = handle; - pubsub_admin_service_t *psa = svc; - psa->addEndpoint(psa->handle, endpoint); + celix_properties_t *endpoint = handle; + pubsub_admin_service_t *psa = svc; + psa->addDiscoveredEndpoint(psa->handle, endpoint); } static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) { - celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL && entry->selectedPsaSvcId < 0) { - long psaSvcId = -1L; - - celixThreadMutex_lock(&manager->pubsubadmins.mutex); - hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); - while (hashMapIterator_hasNext(&iter2)) { - hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); - pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); - long svcId = (long) hashMapEntry_getKey(mapEntry); - bool match = false; - psa->matchEndpoint(psa->handle, entry->endpoint, &match); - if (match) { - psaSvcId = svcId; - break; - } - } - celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - - if (psaSvcId >= 0) { - celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, - (void *)entry->endpoint, pstm_addEndpointCallback); - } else { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid); - } - - entry->selectedPsaSvcId = psaSvcId; - } - } - celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_discovered_endpoint_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->selectedPsaSvcId < 0) { + long psaSvcId = -1L; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); + pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); + long svcId = (long) hashMapEntry_getKey(mapEntry); + bool match = false; + psa->matchDiscoveredEndpoint(psa->handle, entry->endpoint, &match); + if (match) { + psaSvcId = svcId; + break; + } + } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + + if (psaSvcId >= 0) { + celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + (void *) entry->endpoint, pstm_addEndpointCallback); + } else { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid); + } + + entry->selectedPsaSvcId = psaSvcId; + } + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); } static void pstm_setupTopicSenderCallback(void *handle, void *svc) { - pstm_topic_receiver_or_sender_entry_t *entry = handle; - pubsub_admin_service_t *psa = svc; - psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint); + pstm_topic_receiver_or_sender_entry_t *entry = handle; + pubsub_admin_service_t *psa = svc; + psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint); } static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) { - celixThreadMutex_lock(&manager->topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL && entry->needsMatch) { - //new topic sender needed, requesting match with current psa - double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; - long serializerSvcId = -1L; - long selectedPsaSvcId = -1L; - - celixThreadMutex_lock(&manager->pubsubadmins.mutex); - hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); - while (hashMapIterator_hasNext(&iter2)) { - hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); - long svcId = (long)hashMapEntry_getKey(mapEntry); - pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); - double score = PUBSUB_ADMIN_NO_MATCH_SCORE; - long serSvcId = -1L; - psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &score, &serSvcId); - if (score > highestScore) { - highestScore = score; - serializerSvcId = serSvcId; + celixThreadMutex_lock(&manager->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->needsMatch) { + //new topic sender needed, requesting match with current psa + double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serializerSvcId = -1L; + long selectedPsaSvcId = -1L; + celix_properties_t *topicPropertiesForHighestMatch = NULL; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); + long svcId = (long) hashMapEntry_getKey(mapEntry); + pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); + double score = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serSvcId = -1L; + celix_properties_t *topicProps = NULL; + psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId); + if (score > highestScore) { + if (topicPropertiesForHighestMatch != NULL) { + celix_properties_destroy(topicPropertiesForHighestMatch); + } + highestScore = score; + serializerSvcId = serSvcId; selectedPsaSvcId = svcId; - } - } - celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - - if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { - entry->selectedPsaSvcId = selectedPsaSvcId; - entry->selectedSerializerSvcId = serializerSvcId; - bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback); - - if (called && entry->endpoint != NULL) { - entry->needsMatch = false; - - //announce new endpoint through the network - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { - pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); - listener->announceEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - } - } - - if (entry->needsMatch) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic); - } - } - } - celixThreadMutex_unlock(&manager->topicSenders.mutex); + topicPropertiesForHighestMatch = topicProps; + } else if (topicProps != NULL) { + celix_properties_destroy(topicProps); + } + } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + + if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { + entry->selectedPsaSvcId = selectedPsaSvcId; + entry->selectedSerializerSvcId = serializerSvcId; + entry->topicProperties = topicPropertiesForHighestMatch; + bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback); + + if (called && entry->endpoint != NULL) { + entry->needsMatch = false; + + //announce new endpoint through the network + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->announceEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + } + } + + if (entry->needsMatch) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic); + } + } + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); } static void pstm_setupTopicReceiverCallback(void *handle, void *svc) { pstm_topic_receiver_or_sender_entry_t *entry = handle; pubsub_admin_service_t *psa = svc; - psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint); + psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint); } static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) { - celixThreadMutex_lock(&manager->topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry->needsMatch) { - - double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; - long serializerSvcId = -1L; - long selectedPsaSvcId = -1L; - - celixThreadMutex_lock(&manager->pubsubadmins.mutex); - hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); - while (hashMapIterator_hasNext(&iter2)) { - hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); - long svcId = (long) hashMapEntry_getKey(mapEntry); - pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); - double score = PUBSUB_ADMIN_NO_MATCH_SCORE; - long serSvcId = -1L; - - psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &score, &serSvcId); - if (score > highestScore) { - highestScore = score; - serializerSvcId = serSvcId; + celixThreadMutex_lock(&manager->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->needsMatch) { + + double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serializerSvcId = -1L; + long selectedPsaSvcId = -1L; + celix_properties_t *highestMatchTopicProperties = NULL; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter2 = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter2); + long svcId = (long) hashMapEntry_getKey(mapEntry); + pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); + double score = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serSvcId = -1L; + celix_properties_t *topicProps = NULL; + + psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId); + if (score > highestScore) { + if (highestMatchTopicProperties != NULL) { + celix_properties_destroy(highestMatchTopicProperties); + } + highestScore = score; + serializerSvcId = serSvcId; selectedPsaSvcId = svcId; - } - } - celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - - if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { - entry->selectedPsaSvcId = selectedPsaSvcId; - entry->selectedSerializerSvcId = serializerSvcId; - - bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, - entry, - pstm_setupTopicReceiverCallback); - - if (called && entry->endpoint != NULL) { - entry->needsMatch = false; - //announce new endpoint through the network - celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); - for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { - pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get( - manager->announceEndpointListeners.list, i); - listener->announceEndpoint(listener->handle, entry->endpoint); - } - celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); - } - } - - - if (entry->needsMatch) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic); - } - } - } - celixThreadMutex_unlock(&manager->topicReceivers.mutex); + highestMatchTopicProperties = topicProps; + } else if (topicProps != NULL) { + celix_properties_destroy(topicProps); + } + } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + + if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { + entry->selectedPsaSvcId = selectedPsaSvcId; + entry->selectedSerializerSvcId = serializerSvcId; + + bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + entry, + pstm_setupTopicReceiverCallback); + + if (called && entry->endpoint != NULL) { + entry->needsMatch = false; + //announce new endpoint through the network + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get( + manager->announceEndpointListeners.list, i); + listener->announceEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + } + } + + + if (entry->needsMatch) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic); + } + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); } -static void* pstm_psaHandlingThread(void *data) { +static void *pstm_psaHandlingThread(void *data) { pubsub_topology_manager_t *manager = data; celixThreadMutex_lock(&manager->psaHandling.mutex); @@ -841,7 +891,7 @@ static void* pstm_psaHandlingThread(void *data) { celixThreadMutex_unlock(&manager->psaHandling.mutex); while (running) { - //first teardown -> also if rematch is needed + //first teardown -> also if rematch is needed pstm_teardownTopicSenders(manager); pstm_teardownTopicReceivers(manager); @@ -849,7 +899,7 @@ static void* pstm_psaHandlingThread(void *data) { pstm_setupTopicSenders(manager); pstm_setupTopicReceivers(manager); - pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa + pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa celixThreadMutex_lock(&manager->psaHandling.mutex); celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS, 0L); @@ -860,92 +910,141 @@ static void* pstm_psaHandlingThread(void *data) { } -celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) { - pubsub_topology_manager_t *manager = handle; - //TODO add support for searching based on scope and topic +celix_status_t pubsub_topologyManager_shellCommand(void *handle, char *commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) { + pubsub_topology_manager_t *manager = handle; + //TODO add support for searching based on scope and topic - fprintf(os, "\n"); + fprintf(os, "\n"); - fprintf(os, "Discovered Endpoints: \n"); - celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter); + fprintf(os, "Discovered Endpoints: \n"); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter); const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!"); const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!"); const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); - const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); - const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); - const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid); + const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); + const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); + const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid); fprintf(os, " |- container name = %s\n", cn); fprintf(os, " |- fw uuid = %s\n", fwuuid); fprintf(os, " |- type = %s\n", type); - fprintf(os, " |- scope = %s\n", scope); - fprintf(os, " |- topic = %s\n", topic); - fprintf(os, " |- admin type = %s\n", adminType); - fprintf(os, " |- serializer = %s\n", serType); - if (manager->verbose) { - fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId); - fprintf(os, " |- usage count = %i\n", discovered->usageCount); - } - } - celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); - fprintf(os,"\n"); - - - fprintf(os, "Active Topic Senders:\n"); - celixThreadMutex_lock(&manager->topicSenders.mutex); - iter = hashMapIterator_construct(manager->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry->endpoint == NULL) { - continue; - } - const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope); - fprintf(os, " |- topic = %s\n", entry->topic); - fprintf(os, " |- admin type = %s\n", adminType); - fprintf(os, " |- serializer = %s\n", serType); - if (manager->verbose) { - fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); - fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); - fprintf(os, " |- usage count = %i\n", entry->usageCount); - } - } - celixThreadMutex_unlock(&manager->topicSenders.mutex); - fprintf(os,"\n"); - - fprintf(os, "Active Topic Receivers:\n"); - celixThreadMutex_lock(&manager->topicReceivers.mutex); - iter = hashMapIterator_construct(manager->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry->endpoint == NULL) { - continue; - } - const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope); - fprintf(os, " |- topic = %s\n", entry->topic); - fprintf(os, " |- admin type = %s\n", adminType); - fprintf(os, " |- serializer = %s\n", serType); - if (manager->verbose) { - fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); - fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); - fprintf(os, " |- usage count = %i\n", entry->usageCount); - } - } - celixThreadMutex_unlock(&manager->topicReceivers.mutex); - fprintf(os,"\n"); - - fprintf(os, "TODO pending topic senders/receivers\n"); - - return CELIX_SUCCESS; + fprintf(os, " |- scope = %s\n", scope); + fprintf(os, " |- topic = %s\n", topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId); + fprintf(os, " |- usage count = %i\n", discovered->usageCount); + } + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + fprintf(os, "\n"); + + + fprintf(os, "Active Topic Senders:\n"); + int countPendingSenders = 0; + celixThreadMutex_lock(&manager->topicSenders.mutex); + iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->endpoint == NULL) { + ++countPendingSenders; + continue; + } + const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid); + fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- topic = %s\n", entry->topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); + fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + fprintf(os, "\n"); + + fprintf(os, "Active Topic Receivers:\n"); + int countPendingReceivers = 0; + celixThreadMutex_lock(&manager->topicReceivers.mutex); + iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->endpoint == NULL) { + ++countPendingReceivers; + continue; + } + const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid); + fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- topic = %s\n", entry->topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); + fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + fprintf(os, "\n"); + + if (countPendingSenders > 0) { + fprintf(os, "Pending Topic Senders:\n"); + celixThreadMutex_lock(&manager->topicSenders.mutex); + iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->endpoint == NULL) { + fprintf(os, "|- Pending Topic Sender for scope/topic %s/%s:\n", entry->scope, entry->topic); + const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)"); + const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)"); + const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)"); + fprintf(os, " |- requested qos = %s\n", requestedQos); + fprintf(os, " |- requested config = %s\n", requestedConfig); + fprintf(os, " |- requested serializer = %s\n", requestedSer); + if (manager->verbose) { + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } + } + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + fprintf(os, "\n"); + } + + if (countPendingReceivers > 0) { + fprintf(os, "Pending Topic Receivers:\n"); + celixThreadMutex_lock(&manager->topicReceivers.mutex); + iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry->endpoint == NULL) { + fprintf(os, "|- Topic Receiver for scope/topic %s/%s:\n", entry->scope, entry->topic); + const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)"); + const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)"); + const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)"); + fprintf(os, " |- requested qos = %s\n", requestedQos); + fprintf(os, " |- requested config = %s\n", requestedConfig); + fprintf(os, " |- requested serializer = %s\n", requestedSer); + if (manager->verbose) { + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + fprintf(os, "\n"); + } + + + return CELIX_SUCCESS; } http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index 39277fe..e66831e 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -94,6 +94,7 @@ typedef struct pstm_topic_receiver_or_sender_entry { long selectedPsaSvcId; long selectedSerializerSvcId; long bndId; + celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties //for sender entry celix_filter_t *publisherFilter; @@ -104,7 +105,6 @@ typedef struct pstm_topic_receiver_or_sender_entry { celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager); celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); -celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *manager); void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props); void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props); http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/libs/utils/src/properties.c ---------------------------------------------------------------------- diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c index 1519ecc..3c7ba74 100644 --- a/libs/utils/src/properties.c +++ b/libs/utils/src/properties.c @@ -357,7 +357,10 @@ celix_properties_t* celix_properties_copy(const celix_properties_t *properties) } const char* celix_properties_get(const celix_properties_t *properties, const char *key, const char *defaultValue) { - const char* value = hashMap_get((hash_map_t*)properties, (void*)key); + const char* value = NULL; + if (properties != NULL) { + value = hashMap_get((hash_map_t*)properties, (void*)key); + } return value == NULL ? defaultValue : value; }
