Repository: celix Updated Branches: refs/heads/develop a55fd1523 -> 109edf4d0
http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/publisher.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_api/include/pubsub/publisher.h b/pubsub/pubsub_api/include/pubsub/publisher.h index 3eec149..9f7f3b6 100644 --- a/pubsub/pubsub_api/include/pubsub/publisher.h +++ b/pubsub/pubsub_api/include/pubsub/publisher.h @@ -30,12 +30,11 @@ #include <stdlib.h> #define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher" -#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0" +#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0" //properties -#define PUBSUB_PUBLISHER_TOPIC "pubsub.topic" -#define PUBSUB_PUBLISHER_SCOPE "pubsub.scope" -#define PUBSUB_PUBLISHER_STRATEGY "pubsub.strategy" +#define PUBSUB_PUBLISHER_TOPIC "topic" +#define PUBSUB_PUBLISHER_SCOPE "scope" #define PUBSUB_PUBLISHER_CONFIG "pubsub.config" #define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default" http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_api/include/pubsub/subscriber.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_api/include/pubsub/subscriber.h b/pubsub/pubsub_api/include/pubsub/subscriber.h index 5d87b8a..ca6d4d1 100644 --- a/pubsub/pubsub_api/include/pubsub/subscriber.h +++ b/pubsub/pubsub_api/include/pubsub/subscriber.h @@ -33,9 +33,8 @@ #define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0" //properties -#define PUBSUB_SUBSCRIBER_TOPIC "pubsub.topic" -#define PUBSUB_SUBSCRIBER_SCOPE "pubsub.scope" -#define PUBSUB_SUBSCRIBER_STRATEGY "pubsub.strategy" +#define PUBSUB_SUBSCRIBER_TOPIC "topic" +#define PUBSUB_SUBSCRIBER_SCOPE "scope" #define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config" #define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default" http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_watcher.c b/pubsub/pubsub_discovery/src/etcd_watcher.c index 726269a..fe80c14 100644 --- a/pubsub/pubsub_discovery/src/etcd_watcher.c +++ b/pubsub/pubsub_discovery/src/etcd_watcher.c @@ -127,18 +127,18 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu char scope[MAX_FIELD_LENGTH]; char topic[MAX_FIELD_LENGTH]; char fwUUID[MAX_FIELD_LENGTH]; - char serviceId[MAX_FIELD_LENGTH]; + char pubsubUUID[MAX_FIELD_LENGTH]; memset(rootPath,0,MAX_ROOTNODE_LENGTH); memset(topic,0,MAX_FIELD_LENGTH); memset(fwUUID,0,MAX_FIELD_LENGTH); - memset(serviceId,0,MAX_FIELD_LENGTH); + memset(pubsubUUID,0,MAX_FIELD_LENGTH); etcdWatcher_getRootPath(pubsub_discovery->context, rootPath); asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath); if(expr) { - int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); + int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, pubsubUUID); free(expr); if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. status = CELIX_ILLEGAL_STATE; @@ -149,50 +149,28 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu json_error_t error; json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error); - const char* endpoint_serializer = NULL; - const char* endpoint_admin_type = NULL; - const char* endpoint_url = NULL; - const char* endpoint_type = NULL; + properties_t *discovered_props = properties_create(); - if (json_is_object(jsonRoot)){ + if (json_is_object(jsonRoot)) { - void *iter = json_object_iter(jsonRoot); + void *iter = json_object_iter(jsonRoot); - const char *key; - json_t *value; + const char *key; + json_t *value; - while (iter) { - key = json_object_iter_key(iter); - value = json_object_iter_value(iter); + while (iter) { + key = json_object_iter_key(iter); + value = json_object_iter_value(iter); - if (strcmp(key, PUBSUB_ENDPOINT_SERIALIZER) == 0) { - endpoint_serializer = json_string_value(value); - } else if (strcmp(key, PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) { - endpoint_admin_type = json_string_value(value); - } else if (strcmp(key, PUBSUB_ENDPOINT_URL) == 0) { - endpoint_url = json_string_value(value); - } else if (strcmp(key, PUBSUB_ENDPOINT_TYPE) == 0) { - endpoint_type = json_string_value(value); - } - - iter = json_object_iter_next(jsonRoot, iter); - } - - if (endpoint_url == NULL) { - printf("EW: No endpoint found in json object!\n"); - endpoint_url = etcdValue; - } - - } else { - endpoint_url = etcdValue; - } + properties_set(discovered_props, key, json_string_value(value)); + iter = json_object_iter_next(jsonRoot, iter); + } + } - status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP); - if (status == CELIX_SUCCESS) { - status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer); - status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type); - status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_TYPE, endpoint_type); + status = pubsubEndpoint_createFromDiscoveredProperties(discovered_props, pubEP); + if (status != CELIX_SUCCESS) { + properties_destroy(discovered_props); } if (jsonRoot != NULL) { http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/etcd_writer.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/etcd_writer.c b/pubsub/pubsub_discovery/src/etcd_writer.c index e820e50..37220cc 100644 --- a/pubsub/pubsub_discovery/src/etcd_writer.c +++ b/pubsub/pubsub_discovery/src/etcd_writer.c @@ -85,9 +85,9 @@ void etcdWriter_destroy(etcd_writer_pt writer) { memset(dir,0,MAX_ROOTNODE_LENGTH); snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s", rootPath, - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID)); + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID)); etcd_del(dir); pubsubEndpoint_destroy(pubEP); @@ -106,7 +106,7 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end if(storeEP){ const char *fwUUID = NULL; bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) { + if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) { celixThreadMutex_lock(&writer->localPubsLock); pubsub_endpoint_pt p = NULL; pubsubEndpoint_clone(pubEP, &p); @@ -134,32 +134,28 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - asprintf(&key,"%s/%s/%s/%s/%ld", + asprintf(&key,"%s/%s/%s/%s/%s", rootPath, - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), - pubEP->serviceID); - - char serviceID [sizeof(pubEP->serviceID)]; - snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID); - json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}", - PUBSUB_ENDPOINT_SERVICE_ID, serviceID, - PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) stored in endpoint - PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint - PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL), - PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary - PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE) - ); - char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT); + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID)); + + + json_t *jsEndpoint = json_object(); + const char* propKey = NULL; + PROPERTIES_FOR_EACH(pubEP->endpoint_props, propKey) { + const char* val = properties_get(pubEP->endpoint_props, propKey); + json_t* jsVal = json_string(val); + json_object_set(jsEndpoint, propKey, jsVal); + } + char* jsonEndpointStr = json_dumps(jsEndpoint, JSON_COMPACT); if (!etcd_set(key,jsonEndpointStr,ttl,false)) { status = CELIX_ILLEGAL_ARGUMENT; } - FREE_MEM(key); FREE_MEM(jsonEndpointStr); - json_decref(jsonEndpoint); + json_decref(jsEndpoint); return status; } @@ -170,12 +166,12 @@ celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_ const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - asprintf(&key, "%s/%s/%s/%s/%ld", + asprintf(&key, "%s/%s/%s/%s/%s", rootPath, - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), - pubEP->serviceID); + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID)); celixThreadMutex_lock(&writer->localPubsLock); for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/psd_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/psd_activator.c b/pubsub/pubsub_discovery/src/psd_activator.c index 89a517d..ad1cc4a 100644 --- a/pubsub/pubsub_discovery/src/psd_activator.c +++ b/pubsub/pubsub_discovery/src/psd_activator.c @@ -29,7 +29,6 @@ #include "pubsub_common.h" #include "publisher_endpoint_announce.h" -#include "pubsub_discovery.h" #include "pubsub_discovery_impl.h" struct activator { http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index e3e9704..0c075fc 100644 --- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -42,6 +42,8 @@ #include "pubsub_endpoint.h" #include "pubsub_discovery_impl.h" +static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp); + /* Discovery activator functions */ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) { celix_status_t status = CELIX_SUCCESS; @@ -56,11 +58,18 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL); + (*ps_discovery)->verbose = PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE; celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL); celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL); celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL); } + const char *verboseStr = NULL; + bundleContext_getProperty(context, PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY, &verboseStr); + if (verboseStr != NULL) { + (*ps_discovery)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; + } + return status; } @@ -119,7 +128,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); if (fwUUID == NULL) { - printf("PSD: Cannot retrieve fwUUID.\n"); + fprintf(stderr, "ERROR PSD: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } @@ -143,7 +152,7 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { int i; for (i = 0; i < arrayList_size(pubEP_list); i++) { pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i); - if (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) { + if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) { etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP); } else { pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false); @@ -174,10 +183,17 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { celix_status_t status = CELIX_SUCCESS; - bool inform=false; + + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_STATE; + return status; + } + + bool inform = false; celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key); if(pubEP_list==NULL){ arrayList_create(&pubEP_list); @@ -216,12 +232,12 @@ celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, bool found = false; celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pubs_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key); free(pubs_key); if (pubEP_list == NULL) { - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); status = CELIX_ILLEGAL_STATE; } else { int i; @@ -279,14 +295,25 @@ celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pu /* Service's functions implementation */ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) { celix_status_t status = CELIX_SUCCESS; - printf("pubsub_discovery_announcePublisher : %s / %s\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_ARGUMENT; + return status; + } + + if (pubsub_discovery->verbose) { + printf("pubsub_discovery_announcePublisher : %s / %s\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); if(pubEP_list==NULL){ @@ -308,16 +335,21 @@ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) { celix_status_t status = CELIX_SUCCESS; - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + bool valid = pubsub_discovery_isEndpointValid(pubEP); + if (!valid) { + status = CELIX_ILLEGAL_ARGUMENT; + return status; + } + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); free(pub_key); if(pubEP_list==NULL){ - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + printf("WARNING PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); status = CELIX_ILLEGAL_STATE; } else{ @@ -332,7 +364,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt } if(!found){ - printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); + printf("WARNING PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); status = CELIX_ILLEGAL_STATE; } else{ @@ -353,7 +385,7 @@ celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) { pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - char *scope_topic_key = createScopeTopicKey(scope, topic); + char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic); celixThreadMutex_lock(&pubsub_discovery->watchersMutex); struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key); if(wi) { @@ -374,7 +406,7 @@ celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scop celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) { pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; - char *scope_topic_key = createScopeTopicKey(scope, topic); + char *scope_topic_key = pubsubEndpoint_createScopeTopicKey(scope, topic); celixThreadMutex_lock(&pubsub_discovery->watchersMutex); hash_map_entry_pt entry = hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key); @@ -426,7 +458,9 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_ celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); - printf("PSD: pubsub_tm_announce_publisher added.\n"); + if (pubsub_discovery->verbose) { + printf("PSD: pubsub_tm_announce_publisher added.\n"); + } return status; } @@ -450,7 +484,9 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic if (pubsub_discovery->listenerReferences != NULL) { if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) { - printf("PSD: pubsub_tm_announce_publisher removed.\n"); + if (pubsub_discovery->verbose) { + printf("PSD: pubsub_tm_announce_publisher removed.\n"); + } } } celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); @@ -458,3 +494,38 @@ celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, servic return status; } +static bool pubsub_discovery_isEndpointValid(pubsub_endpoint_pt psEp) { + //required properties + bool valid = true; + static const char* keys[] = { + PUBSUB_ENDPOINT_UUID, + PUBSUB_ENDPOINT_FRAMEWORK_UUID, + PUBSUB_ENDPOINT_TYPE, + PUBSUB_ENDPOINT_ADMIN_TYPE, + PUBSUB_ENDPOINT_SERIALIZER, + PUBSUB_ENDPOINT_TOPIC_NAME, + PUBSUB_ENDPOINT_TOPIC_SCOPE, + NULL }; + int i; + for (i = 0; keys[i] != NULL; ++i) { + const char *val = properties_get(psEp->endpoint_props, keys[i]); + if (val == NULL) { //missing required key + fprintf(stderr, "[ERROR] PSD: Invalid endpoint missing key: '%s'\n", keys[i]); + valid = false; + } + } + if (!valid) { + const char *key = NULL; + fprintf(stderr, "PubSubEndpoint entries:\n"); + PROPERTIES_FOR_EACH(psEp->endpoint_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key)); + } + if (psEp->topic_props != NULL) { + fprintf(stderr, "PubSubEndpoint topic properties entries:\n"); + PROPERTIES_FOR_EACH(psEp->topic_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key)); + } + } + } + return valid; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h index 676a6ab..3d25b1c 100644 --- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h +++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h @@ -29,12 +29,15 @@ #define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;} +#define PUBSUB_ETCD_DISCOVERY_VERBOSE_KEY "PUBSUB_ETCD_DISCOVERY_VERBOSE" +#define PUBSUB_ETCD_DISCOVERY_DEFAULT_VERBOSE false + struct watcher_info { etcd_watcher_pt watcher; int nr_references; }; -struct pubsub_discovery { +typedef struct pubsub_discovery { bundle_context_pt context; celix_thread_mutex_t discoveredPubsMutex; @@ -47,7 +50,11 @@ struct pubsub_discovery { hash_map_pt watchers; //key = topicname, value = struct watcher_info etcd_writer_pt writer; -}; + + bool verbose; +} pubsub_discovery_t; + +typedef struct pubsub_discovery *pubsub_discovery_pt; celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery); http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_serializer_json/src/ps_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/src/ps_activator.c b/pubsub/pubsub_serializer_json/src/ps_activator.c index fec5892..32dd1fc 100644 --- a/pubsub/pubsub_serializer_json/src/ps_activator.c +++ b/pubsub/pubsub_serializer_json/src/ps_activator.c @@ -28,6 +28,7 @@ #include "bundle_activator.h" #include "service_registration.h" +#include "pubsub_constants.h" #include "pubsub_serializer_impl.h" @@ -70,7 +71,7 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) /* Set serializer type */ properties_pt props = properties_create(); - properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE); + properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_SERIALIZER_TYPE); status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration); http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/publisher_endpoint_announce.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h index bd39fc0..607e83a 100644 --- a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h +++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h @@ -22,6 +22,10 @@ #include "pubsub_endpoint.h" + +//TODO refactor to pubsub_endpoint_announce +//can be used to announce and remove publisher and subscriber endpoints + struct publisher_endpoint_announce { void *handle; celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP); http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h b/pubsub/pubsub_spi/include/pubsub_admin.h index f24d825..5379415 100644 --- a/pubsub/pubsub_spi/include/pubsub_admin.h +++ b/pubsub/pubsub_spi/include/pubsub_admin.h @@ -32,11 +32,8 @@ #include "pubsub_common.h" #include "pubsub_endpoint.h" -#define PSA_IP "PSA_IP" -#define PSA_ITF "PSA_INTERFACE" -#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" +#include "pubsub_constants.h" -#define PUBSUB_ADMIN_TYPE_KEY "pubsub_admin.type" typedef struct pubsub_admin *pubsub_admin_pt; @@ -52,19 +49,20 @@ struct pubsub_admin_service { celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); + //TODO add match function for subscription service and publication listeners, e.g.: + //matchPublisherListener(admin, bundle, filter, outScore) + //matchSubscriberService(admin, svcRef, outScore) + /* Match principle: - * - A full matching pubsub_admin gives 200 points - * - A full matching serializer gives 100 points - * - If QoS = sample - * - fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75. - * - fallback serializers order of selection is: json, void. Points allocation is 30,20. - * - If QoS = control - * - fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75. - * - fallback serializers order of selection is: json, void. Points allocation is 30,20. - * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two. - * + * - A full matching pubsub_admin gives 100 points */ + //TODO this should only be called for remote endpoints (e.g. not endpoints from this framework celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); + + //TODO redesign add function for handling endpoint seperate, e.g.: + //addEndpoint(admin, endpoint); + //note that endpoints can be subscribers and publishers + //Also note that we than can have pending subscribers and pending (subscriber/publisher) endpoints. }; typedef struct pubsub_admin_service *pubsub_admin_service_pt; http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_admin_match.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h b/pubsub/pubsub_spi/include/pubsub_admin_match.h index e95ca7d..08d6582 100644 --- a/pubsub/pubsub_spi/include/pubsub_admin_match.h +++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h @@ -31,10 +31,17 @@ #define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ #define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ -#define PUBSUB_ADMIN_FULL_MATCH_SCORE 200.0F -#define SERIALIZER_FULL_MATCH_SCORE 100.0F - -celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score); -celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc); +#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F + +celix_status_t pubsub_admin_match( + pubsub_endpoint_pt endpoint, + const char *pubsub_admin_type, + const char *frameworkUuid, + double sampleScore, + double controlScore, + double defaultScore, + array_list_pt serializerList, + double *score); +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out); #endif /* PUBSUB_ADMIN_MATCH_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_constants.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_constants.h b/pubsub/pubsub_spi/include/pubsub_constants.h new file mode 100644 index 0000000..47e31d3 --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_constants.h @@ -0,0 +1,30 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef PUBSUB_CONSTANTS_H_ +#define PUBSUB_CONSTANTS_H_ + +#define PSA_IP "PSA_IP" +#define PSA_ITF "PSA_INTERFACE" +#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" + +#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config" +#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type" + +#endif /* PUBSUB_CONSTANTS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h b/pubsub/pubsub_spi/include/pubsub_endpoint.h index 598d673..c0492f5 100644 --- a/pubsub/pubsub_spi/include/pubsub_endpoint.h +++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h @@ -34,32 +34,48 @@ #include "pubsub/publisher.h" #include "pubsub/subscriber.h" -#define PUBSUB_ENDPOINT_ID "pubsub.endpoint.id" -#define PUBSUB_ENDPOINT_SERVICE_ID "service.id" -#define PUBSUB_ENDPOINT_SERIALIZER "serializer" -#define PUBSUB_ENDPOINT_ADMIN_TYPE "pubsub.admin.type" -#define PUBSUB_ENDPOINT_URL "pubsub.endpoint" -#define PUBSUB_ENDPOINT_TOPIC "pubsub.topic" -#define PUBSUB_ENDPOINT_SCOPE "pubsub.scope" -#define PUBSUB_ENDPOINT_TYPE "pubsub.type" +#include "pubsub_constants.h" + +//required for valid endpoint +#define PUBSUB_ENDPOINT_UUID "pubsub.endpoint.uuid" //required +#define PUBSUB_ENDPOINT_FRAMEWORK_UUID "pubsub.framework.uuid" //required +#define PUBSUB_ENDPOINT_TYPE "pubsub.endpoint.type" //PUBSUB_PUBLISHER_ENDPOINT_TYPE or PUBSUB_SUBSCRIBER_ENDPOINT_TYPE +#define PUBSUB_ENDPOINT_ADMIN_TYPE PUBSUB_ADMIN_TYPE_KEY +#define PUBSUB_ENDPOINT_SERIALIZER PUBSUB_SERIALIZER_TYPE_KEY +#define PUBSUB_ENDPOINT_TOPIC_NAME "pubsub.topic.name" +#define PUBSUB_ENDPOINT_TOPIC_SCOPE "pubsub.topic.scope" + +//optional +#define PUBSUB_ENDPOINT_SERVICE_ID "pubsub.service.id" +#define PUBSUB_ENDPOINT_BUNDLE_ID "pubsub.bundle.id" +#define PUBSUB_ENDPOINT_URL "pubsub.url" + + +#define PUBSUB_PUBLISHER_ENDPOINT_TYPE "pubsub.publisher" +#define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber" + struct pubsub_endpoint { - long serviceID; //optional - bool is_secure; //optional properties_pt endpoint_props; properties_pt topic_props; }; typedef struct pubsub_endpoint *pubsub_endpoint_pt; -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp); -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher); -celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props, pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t* ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out); +celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t* ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out); celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); -celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); +void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value); -char *createScopeTopicKey(const char* scope, const char* topic); +/** + * Creates a pubsub_endpoint based on discovered properties. + * Will take ownership over the discovredProperties + */ +celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out); + +char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic); #endif /* PUBSUB_ENDPOINT_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h b/pubsub/pubsub_spi/include/pubsub_serializer.h index 4489fa4..a91e820 100644 --- a/pubsub/pubsub_spi/include/pubsub_serializer.h +++ b/pubsub/pubsub_spi/include/pubsub_serializer.h @@ -32,8 +32,6 @@ #include "pubsub_common.h" -#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub_serializer.type" - /** * There should be a pubsub_serializer_t * per msg type (msg id) per bundle http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/include/pubsub_utils.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h b/pubsub/pubsub_spi/include/pubsub_utils.h index eb961c9..e4c08ec 100644 --- a/pubsub/pubsub_spi/include/pubsub_utils.h +++ b/pubsub/pubsub_spi/include/pubsub_utils.h @@ -30,10 +30,8 @@ #include "bundle_context.h" #include "array_list.h" -char* pubsub_getScopeFromFilter(const char* bundle_filter); -char* pubsub_getTopicFromFilter(const char* bundle_filter); +celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const char **topic, const char **scope); char* pubsub_getKeysBundleDir(bundle_context_pt ctx); -array_list_pt pubsub_getTopicsFromString(const char* string); #endif /* PUBSUB_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_admin_match.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c b/pubsub/pubsub_spi/src/pubsub_admin_match.c index cfe1dad..5d0fcc9 100644 --- a/pubsub/pubsub_spi/src/pubsub_admin_match.c +++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c @@ -19,312 +19,151 @@ #include <string.h> +#include <limits.h> + #include "service_reference.h" #include "pubsub_admin.h" #include "pubsub_admin_match.h" - -#define KNOWN_PUBSUB_ADMIN_NUM 2 -#define KNOWN_SERIALIZER_NUM 2 - -static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"}; -static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; - -static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"}; -static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; - -static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F}; -static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F}; - -static void get_serializer_type(service_reference_pt svcRef, char **serializerType); -static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService); - -celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){ +#include "constants.h" + +/* + * Match can be called by + * a) a local registered pubsub_subscriber service + * b) a local opened service tracker for a pubsub_publisher service + * c) a remote found publisher endpoint + * Note subscribers are not (yet) dicovered remotely + */ +celix_status_t pubsub_admin_match( + pubsub_endpoint_pt endpoint, + const char *pubsub_admin_type, + const char *frameworkUuid, + double sampleScore, + double controlScore, + double defaultScore, + array_list_pt serializerList, + double *out) { celix_status_t status = CELIX_SUCCESS; - double final_score = 0; - int i = 0, j = 0; + double score = 0; + + const char *endpointFrameworkUuid = NULL; + const char *endpointAdminType = NULL; const char *requested_admin_type = NULL; - const char *requested_serializer_type = NULL; const char *requested_qos_type = NULL; - if(endpoint_props!=NULL){ - requested_admin_type = properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY); - requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); - requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); + if (endpoint->endpoint_props != NULL) { + endpointFrameworkUuid = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID); + endpointAdminType = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_ADMIN_TYPE); } - - /* Analyze the pubsub_admin */ - if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */ - if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match - final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE; - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += qos_pubsub_admin_score[i]; - break; - } - } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += qos_pubsub_admin_score[i]; - break; - } - } - } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; - } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += (qos_pubsub_admin_score[i]/2); - break; - } - } + if (endpoint->topic_props != NULL) { + requested_admin_type = properties_get(endpoint->topic_props, PUBSUB_ADMIN_TYPE_KEY); + requested_qos_type = properties_get(endpoint->topic_props, QOS_ATTRIBUTE_KEY); } - char *serializer_type = NULL; - /* Analyze the serializers */ - if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ - for(i=0;i<arrayList_size(serializerList);i++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ - final_score += SERIALIZER_FULL_MATCH_SCORE; - break; - } - } - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += qos_serializer_score[i]; - } + if (endpointFrameworkUuid != NULL && frameworkUuid != NULL && strncmp(frameworkUuid, endpointFrameworkUuid, 128) == 0) { + //match for local subscriber or publisher + + /* Analyze the pubsub_admin */ + if (requested_admin_type != NULL) { /* We got precise specification on the pubsub_admin we want */ + if (strncmp(requested_admin_type, pubsub_admin_type, strlen(pubsub_admin_type)) == 0) { //Full match + score = PUBSUB_ADMIN_FULL_MATCH_SCORE; } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += qos_serializer_score[i]; - } + } else if (requested_qos_type != NULL) { /* We got QoS specification that will determine the selected PSA */ + if (strncmp(requested_qos_type, QOS_TYPE_SAMPLE, strlen(QOS_TYPE_SAMPLE)) == 0) { + score = sampleScore; + } else if (strncmp(requested_qos_type, QOS_TYPE_CONTROL, strlen(QOS_TYPE_CONTROL)) == 0) { + score += controlScore; + } else { + printf("Unknown QoS type '%s'\n", requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; } + } else { /* We got no specification: fallback to default score */ + score = defaultScore; } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; + + //NOTE serializer influence the score if a specific serializer is configured and not available. + //get best serializer. This is based on service raking or requested serializer. In the case of a request NULL is return if not request match is found. + service_reference_pt serSvcRef = NULL; + pubsub_admin_get_best_serializer(endpoint->topic_props, serializerList, &serSvcRef); + const char *serType = NULL; //for printing info + if (serSvcRef == NULL) { + score = 0; + } else { + serviceReference_getProperty(serSvcRef, PUBSUB_SERIALIZER_TYPE_KEY, &serType); } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += (qos_serializer_score[i]/2); - } + + printf("Score for psa type %s is %f. Serializer used is '%s'\n", pubsub_admin_type, score, serType); + } else { + //remote publisher. score will be 0 or 100. nothing else. + //TODO FIXME remote publisher should go through a different process. Currently it is confusing what to match + if (endpointAdminType == NULL) { + score = 0; + +// const char *key = NULL; +// printf("Endpoint properties:\n"); +// PROPERTIES_FOR_EACH(endpoint->endpoint_props, key) { +// printf("\t%s=%s\n", key, properties_get(endpoint->endpoint_props, key)); +// } + + fprintf(stderr, "WARNING PSA MATCH: remote publisher has no type. The key '%s' must be specified\n", PUBSUB_ENDPOINT_ADMIN_TYPE); + } else { + score = strncmp(endpointAdminType, pubsub_admin_type, 1024) == 0 ? 100 : 0; } + printf("Score for psa type %s is %f. Publisher is remote\n", pubsub_admin_type, score); } - *score = final_score; - printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score); + *out = score; return status; } -celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){ +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out){ celix_status_t status = CELIX_SUCCESS; - - int i = 0, j = 0; - + int i; const char *requested_serializer_type = NULL; - const char *requested_qos_type = NULL; if (endpoint_props != NULL){ requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); - requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); } service_reference_pt svcRef = NULL; - void *svc = NULL; - - /* Analyze the serializers */ - if (arrayList_size(serializerList) == 1) { - // Only one serializer, use this one - svcRef = (service_reference_pt)arrayList_get(serializerList,0); - manage_service_from_reference(svcRef, &svc, true); - *serSvc = svc; - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - printf("Selected the only serializer available. Type = %s\n", serializer_type); - - } - else if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ - for(i=0;i<arrayList_size(serializerList);i++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,i); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - *serSvc = svc; - break; - } - } - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE); - } - } - } - } - } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL); - } - } - } - } - } - } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; - } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]); - } - } - } + service_reference_pt best = NULL; + long hightestRanking = LONG_MIN; + + if (requested_serializer_type != NULL) { + for (i = 0; i < arrayList_size(serializerList); ++i) { + svcRef = (service_reference_pt) arrayList_get(serializerList, 0); + const char* currentSerType = NULL; + serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, ¤tSerType); + if (currentSerType != NULL && strncmp(requested_serializer_type, currentSerType, 128) == 0) { + best = svcRef; + break; + } + } + } else { + //no specific serializer request -> search for highest ranking serializer service + for (i = 0; i < arrayList_size(serializerList); ++i) { + svcRef = (service_reference_pt)arrayList_get(serializerList,0); + const char *service_ranking_str = NULL; + const char* currentSerType = NULL; + serviceReference_getProperty(svcRef, OSGI_FRAMEWORK_SERVICE_RANKING, &service_ranking_str); + serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, ¤tSerType); + long svcRanking = service_ranking_str == NULL ? LONG_MIN : strtol(service_ranking_str, NULL, 10); + if (best == NULL || (svcRanking > hightestRanking && currentSerType != NULL)) { + best = svcRef; + hightestRanking = svcRanking; + } + if (currentSerType == NULL) { + fprintf(stderr, "Invalid pubsub_serializer service. Must have a property '%s'\n", PUBSUB_SERIALIZER_TYPE_KEY); } - } - } - - if(svc!=NULL && svcRef!=NULL){ - manage_service_from_reference(svcRef, svc, false); - } - - return status; -} + } + } -static void get_serializer_type(service_reference_pt svcRef, char **serializerType){ + *out = best; - const char *serType = NULL; - serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType != NULL){ - *serializerType = (char*)serType; - } - else{ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef); - *serializerType = NULL; - } -} - -static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){ - bundle_context_pt context = NULL; - bundle_pt bundle = NULL; - serviceReference_getBundle(svcRef, &bundle); - bundle_getContext(bundle, &context); - if(getService){ - bundleContext_getService(context, svcRef, svc); - } - else{ - bundleContext_ungetService(context, svcRef, NULL); - } -} + return status; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c b/pubsub/pubsub_spi/src/pubsub_endpoint.c index d3b746e..1950433 100644 --- a/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -38,10 +38,11 @@ #include "pubsub_utils.h" -static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps); +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId,const char* endpoint, const char *pubsubType, properties_pt topic_props); static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher); +static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp); -static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){ +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char *pubsubType, properties_pt topic_props) { if (psEp->endpoint_props == NULL) { psEp->endpoint_props = properties_create(); @@ -52,33 +53,43 @@ static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID uuid_t endpointUid; uuid_generate(endpointUid); uuid_unparse(endpointUid, endpointUuid); - properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid); + + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_UUID, endpointUuid); if (fwUUID != NULL) { - properties_set(psEp->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID); + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID); } if (scope != NULL) { - properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, scope); + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope); } if (topic != NULL) { - properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, topic); + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME, topic); } - psEp->serviceID = serviceId; + char idBuf[32]; + + if (bundleId >= 0) { + snprintf(idBuf, sizeof(idBuf), "%li", bundleId); + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID, idBuf); + } + + if (serviceId >= 0) { + snprintf(idBuf, sizeof(idBuf), "%li", bundleId); + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, idBuf); + } if(endpoint != NULL) { properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint); } - if(topic_props != NULL){ - if(cloneProps){ - properties_copy(topic_props, &(psEp->topic_props)); - } - else{ - psEp->topic_props = topic_props; - } + if (pubsubType != NULL) { + properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TYPE, pubsubType); + } + + if(topic_props != NULL) { + properties_copy(topic_props, &(psEp->topic_props)); } } @@ -129,12 +140,22 @@ celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, c return status; } -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){ +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props,pubsub_endpoint_pt* out){ celix_status_t status = CELIX_SUCCESS; - *psEp = calloc(1, sizeof(**psEp)); + pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp)); - pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true); + pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, serviceId, endpoint, pubsubType, topic_props); + + if (!pubsubEndpoint_isEndpointValid(psEp)) { + status = CELIX_ILLEGAL_STATE; + } + + if (status == CELIX_SUCCESS) { + *out = psEp; + } else { + pubsubEndpoint_destroy(psEp); + } return status; @@ -151,9 +172,6 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *o status += properties_copy(in->topic_props, &(ep->topic_props)); } - ep->serviceID = in->serviceID; - ep->is_secure = in->is_secure; - if (status == CELIX_SUCCESS) { *out = ep; } else { @@ -161,23 +179,18 @@ celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *o } return status; - } -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){ +celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out){ celix_status_t status = CELIX_SUCCESS; pubsub_endpoint_pt ep = calloc(1,sizeof(*ep)); - bundle_pt bundle = NULL; - bundle_context_pt ctxt = NULL; const char* fwUUID = NULL; - serviceReference_getBundle(reference,&bundle); - bundle_getContext(bundle,&ctxt); - bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); const char* scope = NULL; - serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope); + serviceReference_getPropertyWithDefault(reference, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, &scope); const char* topic = NULL; serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic); @@ -185,70 +198,105 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re const char* serviceId = NULL; serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); + + long bundleId = -1; + bundle_pt bundle = NULL; + serviceReference_getBundle(reference, &bundle); + if (bundle != NULL) { + bundle_getBundleId(bundle, &bundleId); + } + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); - pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false); + const char *pubsubType = isPublisher ? PUBSUB_PUBLISHER_ENDPOINT_TYPE : PUBSUB_SUBSCRIBER_ENDPOINT_TYPE; - if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) || - !ep->serviceID || - !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_SCOPE) || - !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_TOPIC)) { + pubsubEndpoint_setFields(ep, fwUUID, scope, topic, bundleId, strtol(serviceId,NULL,10), NULL, pubsubType, topic_props); - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); - status = CELIX_BUNDLE_EXCEPTION; - pubsubEndpoint_destroy(ep); - *psEp = NULL; - } - else{ - *psEp = ep; - } + if (!pubsubEndpoint_isEndpointValid(ep)) { + status = CELIX_ILLEGAL_STATE; + } + + if (status == CELIX_SUCCESS) { + *out = ep; + } else { + pubsubEndpoint_destroy(ep); + } return status; } -celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){ +celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out) { + celix_status_t status = CELIX_SUCCESS; + pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp)); + if (psEp != NULL) { + psEp->endpoint_props = discoveredProperties; + } else { + status = CELIX_ENOMEM; + } + + if (!pubsubEndpoint_isEndpointValid(psEp)) { + status = CELIX_ILLEGAL_STATE; + } + + if (status == CELIX_SUCCESS) { + *out = psEp; + } else { + pubsubEndpoint_destroy(psEp); + } + + return status; +} + +celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out){ celix_status_t status = CELIX_SUCCESS; const char* fwUUID=NULL; - bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if(fwUUID==NULL){ + if( fwUUID==NULL) { return CELIX_BUNDLE_EXCEPTION; } - char* topic = pubsub_getTopicFromFilter(info->filter); - if(topic==NULL){ + const char* topic = NULL; + const char* scope = NULL; + pubsub_getPubSubInfoFromFilter(info->filter, &topic, &scope); + + if (topic==NULL) { return CELIX_BUNDLE_EXCEPTION; } - - *psEp = calloc(1, sizeof(**psEp)); - - char* scope = pubsub_getScopeFromFilter(info->filter); if(scope == NULL) { scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); } + pubsub_endpoint_pt psEp = calloc(1, sizeof(**out)); + bundle_pt bundle = NULL; long bundleId = -1; bundleContext_getBundle(info->context,&bundle); - bundle_getBundleId(bundle,&bundleId); properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ - pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false); + pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, -1, NULL, PUBSUB_PUBLISHER_ENDPOINT_TYPE, topic_props); - free(topic); - free(scope); + if (!pubsubEndpoint_isEndpointValid(psEp)) { + status = CELIX_ILLEGAL_STATE; + } + if (status == CELIX_SUCCESS) { + *out = psEp; + } else { + pubsubEndpoint_destroy(psEp); + } return status; } -celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ +void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ + if (psEp == NULL) return; if(psEp->topic_props != NULL){ properties_destroy(psEp->topic_props); @@ -260,23 +308,53 @@ celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ free(psEp); - return CELIX_SUCCESS; + return; } bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ - return ((strcmp(properties_get(psEp1->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) && - (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_SCOPE))==0) && - (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_TOPIC))==0) && - (psEp1->serviceID == psEp2->serviceID) /*&& - ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ - ); + return strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_UUID),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_UUID)); } -char *createScopeTopicKey(const char* scope, const char* topic) { +char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) { char *result = NULL; asprintf(&result, "%s:%s", scope, topic); return result; } + + +static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) { + //required properties + bool valid = true; + static const char* keys[] = { + PUBSUB_ENDPOINT_UUID, + PUBSUB_ENDPOINT_FRAMEWORK_UUID, + PUBSUB_ENDPOINT_TYPE, + PUBSUB_ENDPOINT_TOPIC_NAME, + PUBSUB_ENDPOINT_TOPIC_SCOPE, + NULL }; + int i; + for (i = 0; keys[i] != NULL; ++i) { + const char *val = properties_get(psEp->endpoint_props, keys[i]); + if (val == NULL) { //missing required key + fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing key: '%s'\n", keys[i]); + valid = false; + } + } + if (!valid) { + const char *key = NULL; + fprintf(stderr, "PubSubEndpoint entries:\n"); + PROPERTIES_FOR_EACH(psEp->endpoint_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key)); + } + if (psEp->topic_props != NULL) { + fprintf(stderr, "PubSubEndpoint topic properties entries:\n"); + PROPERTIES_FOR_EACH(psEp->topic_props, key) { + fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key)); + } + } + } + return valid; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_spi/src/pubsub_utils.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c b/pubsub/pubsub_spi/src/pubsub_utils.c index 53bacb8..daf4e9f 100644 --- a/pubsub/pubsub_spi/src/pubsub_utils.c +++ b/pubsub/pubsub_spi/src/pubsub_utils.c @@ -42,85 +42,43 @@ #define MAX_KEYBUNDLE_LENGTH 256 -char* pubsub_getScopeFromFilter(const char* bundle_filter){ - char* scope = NULL; - - char* filter = strdup(bundle_filter); - - char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); - if(oc!=NULL){ - oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; - if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ - - char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE); - if(scopes!=NULL){ - - scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1; - char* bottom=strchr(scopes,')'); - *bottom='\0'; - - scope=strdup(scopes); - } else { - scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); +celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, const char **topicOut, const char **scopeOut) { + celix_status_t status = CELIX_SUCCESS; + const char *topic = NULL; + const char *scope = NULL; + const char *objectClass = NULL; + celix_filter_t *filter = filter_create(filterstr); + if (filter != NULL) { + if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=exmpl)) + array_list_t *attributes = filter->children; + unsigned int i; + unsigned int size = arrayList_size(attributes); + for (i = 0; i < size; ++i) { + filter_t *attr = arrayList_get(attributes, i); + if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) { + if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) { + objectClass = attr->value; + } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) { + topic = attr->value; + } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) { + scope = attr->value; + } + } } } } - free(filter); - - return scope; -} - -char* pubsub_getTopicFromFilter(const char* bundle_filter){ - - char* topic = NULL; - - char* filter = strdup(bundle_filter); - - char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); - if(oc!=NULL){ - oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; - if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ - - char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC); - if(topics!=NULL){ - - topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1; - char* bottom=strchr(topics,')'); - *bottom='\0'; - - topic=strdup(topics); - - } - } + if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) { + *topicOut = topic; + *scopeOut = scope; + } else { + *topicOut = NULL; + *scopeOut = NULL; } - - free(filter); - - return topic; - + return status; } -array_list_pt pubsub_getTopicsFromString(const char* string){ - - array_list_pt topic_list = NULL; - arrayList_create(&topic_list); - - char* topics = strdup(string); - - char* topic = strtok(topics,",;|# "); - arrayList_add(topic_list,strdup(topic)); - - while( (topic = strtok(NULL,",;|# ")) !=NULL){ - arrayList_add(topic_list,strdup(topic)); - } - - free(topics); - - return topic_list; - -} /** * Loop through all bundles and look for the bundle with the keys inside. http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 71a9ad9..5b983d4 100644 --- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -110,6 +110,13 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help (*manager)->shellCmdService.handle = *manager; (*manager)->shellCmdService.executeCommand = shellCommand; + (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE; + const char *verboseStr = NULL; + bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr); + if (verboseStr != NULL) { + (*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; + } + properties_pt shellProps = properties_create(); properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info"); properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info"); @@ -182,6 +189,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ celixThreadMutex_lock(&manager->subscriptionsLock); hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions); + //TODO FIXME no matching used, should only add unmatched subscribers ? + //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA while (hashMapIterator_hasNext(subscriptionsIterator)) { array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator); for(i=0;i<arrayList_size(sub_ep_list);i++){ @@ -197,6 +206,8 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ status = celixThreadMutex_lock(&manager->publicationsLock); hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications); + //TODO FIXME no matching used, should only add unmatched publications ? + //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA while (hashMapIterator_hasNext(publicationsIterator)) { array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator); for(i=0;i<arrayList_size(pub_ep_list);i++){ @@ -252,7 +263,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc unsigned int i; for(i=0;i<arrayList_size(pubEP_list);i++){ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){ + if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){ disc->removePublisher(disc->handle,pubEP); } } @@ -297,9 +308,9 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref //subscriber_service_pt subscriber = (subscriber_service_pt)service; pubsub_endpoint_pt sub = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromServiceReference(manager->context, reference,false, &sub) == CELIX_SUCCESS){ celixThreadMutex_lock(&manager->subscriptionsLock); - char *sub_key = createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); if(sub_list_by_topic==NULL){ @@ -319,13 +330,13 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref for(j=0;j<arrayList_size(manager->psaList);j++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); psa->matchEndpoint(psa->admin,sub,&score); - if(score>best_score){ /* We have a new winner! */ + if (score > best_score) { /* We have a new winner! */ best_score = score; best_psa = psa; } } - if(best_psa != NULL && best_score>0){ + if (best_psa != NULL && best_score>0) { best_psa->addSubscription(best_psa->admin,sub); } @@ -336,7 +347,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); publisher_endpoint_announce_pt disc = NULL; bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); bundleContext_ungetService(manager->context, disc_sr, NULL); } hashMapIterator_destroy(iter); @@ -364,7 +375,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r pubsub_topology_manager_pt manager = handle; pubsub_endpoint_pt subcmp = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromServiceReference(manager->context, reference, false, &subcmp) == CELIX_SUCCESS){ unsigned int j,k; @@ -375,7 +386,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); publisher_endpoint_announce_pt disc = NULL; bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); bundleContext_ungetService(manager->context, disc_sr, NULL); } hashMapIterator_destroy(iter); @@ -384,7 +395,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r celixThreadMutex_lock(&manager->subscriptionsLock); celixThreadMutex_lock(&manager->psaListLock); - char *sub_key = createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); free(sub_key); if(sub_list_by_topic!=NULL){ @@ -404,7 +415,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r if(arrayList_size(sub_list_by_topic)==0){ for(k=0;k<arrayList_size(manager->psaList);k++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); } } @@ -451,7 +462,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) { pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){ + if( (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){ status += disc->announcePublisher(disc->handle,pubEP); } } @@ -469,7 +480,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service for(i=0;i<arrayList_size(l);i++){ pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); - disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); } } hashMapIterator_destroy(iter); @@ -517,10 +528,10 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ listener_hook_info_pt info = arrayList_get(listeners, l_index); pubsub_endpoint_pt pub = NULL; - if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub) == CELIX_SUCCESS){ celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); if(pub_list_by_topic==NULL){ arrayList_create(&pub_list_by_topic); @@ -546,7 +557,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ } } - if(best_psa != NULL && best_score>0){ + if (best_psa != NULL && best_score > 0) { status = best_psa->addPublication(best_psa->admin,pub); if(status==CELIX_SUCCESS){ celixThreadMutex_lock(&manager->discoveryListLock); @@ -585,14 +596,14 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra listener_hook_info_pt info = arrayList_get(listeners, l_index); pubsub_endpoint_pt pubcmp = NULL; - if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){ unsigned int j,k; celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); if(pub_list_by_topic!=NULL){ for(j=0;j<arrayList_size(pub_list_by_topic);j++){ @@ -625,7 +636,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra if(arrayList_size(pub_list_by_topic)==0){ for(k=0;k<arrayList_size(manager->psaList);k++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); } } @@ -651,16 +662,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + pubsub_topology_manager_pt manager = handle; + + if (manager->verbose) { + printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + - pubsub_topology_manager_pt manager = handle; celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); if(pub_list_by_topic==NULL){ @@ -672,7 +687,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end /* Shouldn't be any other duplicate, since it's filtered out by the discovery */ pubsub_endpoint_pt p = NULL; pubsubEndpoint_clone(pubEP, &p); - arrayList_add(pub_list_by_topic,p); + arrayList_add(pub_list_by_topic , p); unsigned int j; double score = 0; @@ -681,14 +696,16 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end for(j=0;j<arrayList_size(manager->psaList);j++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,p,&score); - if(score>best_score){ /* We have a new winner! */ + psa->matchEndpoint(psa->admin , p, &score); + if (score>best_score) { /* We have a new winner! */ best_score = score; best_psa = psa; } } - if(best_psa != NULL && best_score>0){ + if(best_psa != NULL && best_score>0) { + //TODO FIXME this the same call as used by publisher of service trackers. This is confusing. + //remote discovered publication can be handle different. best_psa->addPublication(best_psa->admin,p); } else{ @@ -703,20 +720,24 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n", - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), - properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), - properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + pubsub_topology_manager_pt manager = handle; + + if (manager->verbose) { + printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID)); + } - pubsub_topology_manager_pt manager = handle; celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); unsigned int i; - char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); if(pub_list_by_topic==NULL){ - printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); status = CELIX_ILLEGAL_STATE; } else{ @@ -744,7 +765,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo for(i=0;i<arrayList_size(manager->psaList);i++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); } } http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index cdcc651..769048d 100644 --- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -37,6 +37,9 @@ #include "pubsub/publisher.h" #include "pubsub/subscriber.h" + #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE" +#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false + struct pubsub_topology_manager { bundle_context_pt context; @@ -58,6 +61,8 @@ struct pubsub_topology_manager { log_helper_pt loghelper; + + bool verbose; }; typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
