Repository: celix Updated Branches: refs/heads/develop 18c35d657 -> 96cd7e023
http://git-wip-us.apache.org/repos/asf/celix/blob/96cd7e02/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index 0c075fc..f0b94c5 100644 --- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -46,71 +46,70 @@ static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp); /* Discovery activator functions */ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) { - celix_status_t status = CELIX_SUCCESS; - - *ps_discovery = calloc(1, sizeof(**ps_discovery)); - - if (*ps_discovery == NULL) { - status = CELIX_ENOMEM; - } - else{ - (*ps_discovery)->context = context; - (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); - (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL); - (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE; - celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL); - celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL); - celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL); - } - - const char *verboseStr = NULL; - bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr); - if (verboseStr != NULL) { - (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; - } - - return status; + celix_status_t status = CELIX_SUCCESS; + + *ps_discovery = calloc(1, sizeof(**ps_discovery)); + + if (*ps_discovery == NULL) { + return CELIX_ENOMEM; + } + + (*ps_discovery)->context = context; + (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); + (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL); + (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE; + celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL); + celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL); + celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL); + + const char *verboseStr = NULL; + bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr); + if (verboseStr != NULL) { + (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; + } + + return status; } celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); + celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); - hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs); + hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs); - while (hashMapIterator_hasNext(iter)) { - array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); + while (hashMapIterator_hasNext(iter)) { + array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); - for(int i=0; i < arrayList_size(pubEP_list); i++) { - pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i))); - } - arrayList_destroy(pubEP_list); - } + for(int i=0; i < arrayList_size(pubEP_list); i++) { + pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i))); + } + arrayList_destroy(pubEP_list); + } - hashMapIterator_destroy(iter); + hashMapIterator_destroy(iter); - hashMap_destroy(ps_discovery->discoveredPubs, true, false); - ps_discovery->discoveredPubs = NULL; + hashMap_destroy(ps_discovery->discoveredPubs, true, false); + ps_discovery->discoveredPubs = NULL; - celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); + celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); - celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex); + celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex); - celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex); + celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex); - hashMap_destroy(ps_discovery->listenerReferences, false, false); - ps_discovery->listenerReferences = NULL; + hashMap_destroy(ps_discovery->listenerReferences, false, false); + ps_discovery->listenerReferences = NULL; - celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex); + celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex); - celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex); + celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex); - free(ps_discovery); + free(ps_discovery); - return status; + return status; } celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) { @@ -182,48 +181,48 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { /* Functions called by the etcd_watcher */ celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - - bool valid = pubsub_discovery_isEndpointValid(pubEP); - if (!valid) { - status = CELIX_ILLEGAL_STATE; - return status; - } - - bool inform = false; - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - - char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key); - if(pubEP_list==NULL){ - arrayList_create(&pubEP_list); - arrayList_add(pubEP_list,pubEP); - hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list); - inform=true; - } - else{ - int i; - bool found = false; - for(i=0;i<arrayList_size(pubEP_list) && !found;i++){ - found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i)); - } - if(found){ - pubsubEndpoint_destroy(pubEP); - } - else{ - arrayList_add(pubEP_list,pubEP); - inform=true; - } - } - free(pubs_key); - - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - - if(inform){ - status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true); - } - - return status; + celix_status_t status = CELIX_SUCCESS; + + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_STATE; + return status; + } + + bool inform = false; + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + + char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key); + if(pubEP_list==NULL){ + arrayList_create(&pubEP_list); + arrayList_add(pubEP_list,pubEP); + hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list); + inform=true; + } + else{ + int i; + bool found = false; + for(i=0;i<arrayList_size(pubEP_list) && !found;i++){ + found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i)); + } + if(found){ + pubsubEndpoint_destroy(pubEP); + } + else{ + arrayList_add(pubEP_list,pubEP); + inform=true; + } + } + free(pubs_key); + + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + + if(inform){ + status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true); + } + + return status; } celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { @@ -237,7 +236,7 @@ celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, free(pubs_key); if (pubEP_list == NULL) { printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); status = CELIX_ILLEGAL_STATE; } else { int i; @@ -263,123 +262,123 @@ celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, /* Callback to the pubsub_topology_manager */ celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; - // Inform listeners of new publisher endpoint - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + // Inform listeners of new publisher endpoint + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - if (pubsub_discovery->listenerReferences != NULL) { - hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences); - while (hashMapIterator_hasNext(iter)) { - service_reference_pt reference = hashMapIterator_nextKey(iter); + if (pubsub_discovery->listenerReferences != NULL) { + hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences); + while (hashMapIterator_hasNext(iter)) { + service_reference_pt reference = hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt listener = NULL; + publisher_endpoint_announce_pt listener = NULL; - bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener); + bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener); if (epAdded) { listener->announcePublisher(listener->handle, pubEP); } else { listener->removePublisher(listener->handle, pubEP); } bundleContext_ungetService(pubsub_discovery->context, reference, NULL); - } - hashMapIterator_destroy(iter); - } + } + hashMapIterator_destroy(iter); + } - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - return status; + return status; } /* Service's functions implementation */ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + celix_status_t status = CELIX_SUCCESS; + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - bool valid = pubsub_discovery_isEndpointValid(pubEP); - if (!valid) { - status = CELIX_ILLEGAL_ARGUMENT; - return status; - } + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_ARGUMENT; + return status; + } - if (pubsub_discovery->verbose) { - printf("pubsub_discovery_announcePublisher : %s / %s\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); - } + if (pubsub_discovery->verbose) { + printf("pubsub_discovery_announcePublisher : %s / %s\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); - if(pubEP_list==NULL){ - arrayList_create(&pubEP_list); - hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list); - } - free(pub_key); - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_clone(pubEP, &p); + if(pubEP_list==NULL){ + arrayList_create(&pubEP_list); + hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list); + } + free(pub_key); + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); - arrayList_add(pubEP_list,p); + arrayList_add(pubEP_list,p); - status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true); + status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true); - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - return status; + return status; } celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + celix_status_t status = CELIX_SUCCESS; + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - bool valid = pubsub_discovery_isEndpointValid(pubEP); - if (!valid) { - status = CELIX_ILLEGAL_ARGUMENT; - return status; - } + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_ARGUMENT; + return status; + } - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); - free(pub_key); - if(pubEP_list==NULL){ - printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); - status = CELIX_ILLEGAL_STATE; - } - else{ + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); + free(pub_key); + if(pubEP_list==NULL){ + printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + status = CELIX_ILLEGAL_STATE; + } + else{ - int i; - bool found = false; - pubsub_endpoint_pt p = NULL; + int i; + bool found = false; + pubsub_endpoint_pt p = NULL; - for(i=0;!found && i<arrayList_size(pubEP_list);i++){ - p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - found = pubsubEndpoint_equals(pubEP,p); - } + for(i=0;!found && i<arrayList_size(pubEP_list);i++){ + p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); + found = pubsubEndpoint_equals(pubEP,p); + } - if(!found){ - printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); - status = CELIX_ILLEGAL_STATE; - } - else{ + if(!found){ + printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); + status = CELIX_ILLEGAL_STATE; + } + else{ - arrayList_removeElement(pubEP_list,p); + arrayList_removeElement(pubEP_list,p); - status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p); + status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p); - pubsubEndpoint_destroy(p); - } - } + pubsubEndpoint_destroy(p); + } + } - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - return status; + return status; } celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) { @@ -432,100 +431,100 @@ celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* sc /* pubsub_topology_manager tracker callbacks */ celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle; - publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service; + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle; + publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service; - celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - /* Notify the PSTM about discovered publisher endpoints */ - hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs); - while(hashMapIterator_hasNext(iter)){ - array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); - int i; - for(i=0;i<arrayList_size(pubEP_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - status += listener->announcePublisher(listener->handle, pubEP); - } - } + /* Notify the PSTM about discovered publisher endpoints */ + hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs); + while(hashMapIterator_hasNext(iter)){ + array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); + int i; + for(i=0;i<arrayList_size(pubEP_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); + status += listener->announcePublisher(listener->handle, pubEP); + } + } - hashMapIterator_destroy(iter); + hashMapIterator_destroy(iter); - hashMap_put(pubsub_discovery->listenerReferences, reference, NULL); + hashMap_put(pubsub_discovery->listenerReferences, reference, NULL); - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - if (pubsub_discovery->verbose) { - printf("PSD: pubsub_tm_announce_publisher added.\n"); - } + if (pubsub_discovery->verbose) { + printf("PSD: pubsub_tm_announce_publisher added.\n"); + } - return status; + return status; } celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; - status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service); - if (status == CELIX_SUCCESS) { - status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service); - } + status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service); + if (status == CELIX_SUCCESS) { + status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service); + } - return status; + return status; } celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = handle; + celix_status_t status = CELIX_SUCCESS; + pubsub_discovery_pt pubsub_discovery = handle; - celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); - if (pubsub_discovery->listenerReferences != NULL) { - if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) { - if (pubsub_discovery->verbose) { - printf("PSD: pubsub_tm_announce_publisher removed.\n"); - } - } - } - celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + if (pubsub_discovery->listenerReferences != NULL) { + if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) { + if (pubsub_discovery->verbose) { + printf("PSD: pubsub_tm_announce_publisher removed.\n"); + } + } + } + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); - return status; + return status; } static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) { - //required properties - bool valid = true; - static const char* keys[] = { - PUBSUB_ENDPOINT_UUID, - PUBSUB_ENDPOINT_FRAMEWORK_UUID, - PUBSUB_ENDPOINT_TYPE, - PUBSUB_ENDPOINT_ADMIN_TYPE, - PUBSUB_ENDPOINT_SERIALIZER, - PUBSUB_ENDPOINT_TOPIC_NAME, - PUBSUB_ENDPOINT_TOPIC_SCOPE, - NULL }; - int i; - for (i = 0; keys[i] != NULL; ++i) { - const char *val = properties_get(psEp->endpoint_props, keys[i]); - if (val == NULL) { //missing required key - fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]); - valid = false; - } - } - if (!valid) { - const char *key = NULL; - fprintf(stderr, "PubSubEndpoint entries:\n"); - PROPERTIES_FOR_EACH(psEp->endpoint_props, key) { - fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key)); - } - if (psEp->topic_props != NULL) { - fprintf(stderr, "PubSubEndpoint topic properties entries:\n"); - PROPERTIES_FOR_EACH(psEp->topic_props, key) { - fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key)); - } - } - } - return valid; -} \ No newline at end of file + //required properties + bool valid = true; + static const char* keys[] = { + PUBSUB_ENDPOINT_UUID, + PUBSUB_ENDPOINT_FRAMEWORK_UUID, + PUBSUB_ENDPOINT_TYPE, + PUBSUB_ENDPOINT_ADMIN_TYPE, + PUBSUB_ENDPOINT_SERIALIZER, + PUBSUB_ENDPOINT_TOPIC_NAME, + PUBSUB_ENDPOINT_TOPIC_SCOPE, + NULL }; + int i; + for (i = 0; keys[i] != NULL; ++i) { + const char *val = properties_get(psEp->endpoint_props, keys[i]); + if (val == NULL) { //missing required key + fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]); + valid = false; + } + } + if (!valid) { + const char *key = NULL; + fprintf(stderr, "PubSubEndpoint entries:\n"); + PROPERTIES_FOR_EACH(psEp->endpoint_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key)); + } + if (psEp->topic_props != NULL) { + fprintf(stderr, "PubSubEndpoint topic properties entries:\n"); + PROPERTIES_FOR_EACH(psEp->topic_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key)); + } + } + } + return valid; +} http://git-wip-us.apache.org/repos/asf/celix/blob/96cd7e02/pubsub/pubsub_spi/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c b/pubsub/pubsub_spi/src/pubsub_endpoint.c index 535ade0..30b34cf 100644 --- a/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -229,13 +229,15 @@ celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx, celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out) { celix_status_t status = CELIX_SUCCESS; + pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp)); - if (psEp != NULL) { - psEp->endpoint_props = discoveredProperties; - } else { - status = CELIX_ENOMEM; + + if (psEp == NULL) { + return CELIX_ENOMEM; } + psEp->endpoint_props = discoveredProperties; + if (!pubsubEndpoint_isEndpointValid(psEp)) { status = CELIX_ILLEGAL_STATE; } http://git-wip-us.apache.org/repos/asf/celix/blob/96cd7e02/pubsub/pubsub_spi/src/pubsub_utils.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c b/pubsub/pubsub_spi/src/pubsub_utils.c index 2b38a17..5050077 100644 --- a/pubsub/pubsub_spi/src/pubsub_utils.c +++ b/pubsub/pubsub_spi/src/pubsub_utils.c @@ -69,11 +69,9 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi } } - if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) { + if (topic != NULL && scope != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) { *topicOut = strdup(topic); - if (scope != NULL) { - *scopeOut = strdup(scope); - } + *scopeOut = strdup(scope); } else { *topicOut = NULL; *scopeOut = NULL;
