http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h new file mode 100644 index 0000000..38a9127 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +#ifndef CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H +#define CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H + +#include "celix_bundle_context.h" + +typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t; + +pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, /*TODO rest args*/ const char *scope, const char *topic, long serializerSvcId); +void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender); + +const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender); +const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender); +const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender); +const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender); +const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender); +//TODO connections etc + +void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint); +void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint); + +#endif //CELIX_PUBSUB_UDPMC_TOPIC_SENDER_H
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c index 046feb6..eea8460 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c @@ -98,12 +98,12 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(int sendSocket, celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out){ char* ep = malloc(EP_ADDRESS_LEN); memset(ep,0,EP_ADDRESS_LEN); - long serviceId = celix_properties_getAsLong(pubEP->properties, OSGI_FRAMEWORK_SERVICE_ID, 0); + long serviceId = celix_properties_getAsLong(pubEP, OSGI_FRAMEWORK_SERVICE_ID, 0); unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, UDP_MAX_PORT); snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port); @@ -169,7 +169,7 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top /* Let's register the new service */ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); + celix_properties_t *pubEP = arrayList_get(pub->pub_ep_list,0); if(pubEP!=NULL){ service_factory_pt factory = calloc(1, sizeof(*factory)); @@ -178,8 +178,8 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top factory->ungetService = pubsub_topicPublicationUngetService; properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE)); - properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE)); + properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION); @@ -188,8 +188,8 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top if(status != CELIX_SUCCESS){ properties_destroy(props); printf("[PSA UDPMC] Cannot register ServiceFactory for topic %s, topic %s.\n", - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); } else { *svcFactory = factory; } @@ -206,19 +206,19 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ return serviceRegistration_unregister(pub->svcFactoryReg); } -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) { +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep) { celixThreadMutex_lock(&(pub->tp_lock)); - pubsubEndpoint_setField(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, pub->endpoint); - pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE); - pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type); + celix_properties_set(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, pub->endpoint); + celix_properties_set(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE); + celix_properties_set(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type); arrayList_add(pub->pub_ep_list,ep); celixThreadMutex_unlock(&(pub->tp_lock)); return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep) { +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,celix_properties_t *ep) { celixThreadMutex_lock(&(pub->tp_lock)); arrayList_removeElement(pub->pub_ep_list,ep); @@ -401,9 +401,9 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes); } - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); - bound->scope=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE)); - bound->topic=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + celix_properties_t *pubEP = arrayList_get(bound->parent->pub_ep_list,0); + bound->scope=strdup(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE)); + bound->topic=strdup(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); bound->largeUdpHandle = largeUdp_create(1); bound->service.handle = bound; http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h index e0a5698..0dc89fb 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h @@ -43,11 +43,11 @@ typedef struct pubsub_udp_msg { } pubsub_udp_msg_t; typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(int sendSocket, celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep); +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub, celix_properties_t *ep); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c index 82adf24..e08f219 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c @@ -391,7 +391,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt return status; } -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); @@ -412,7 +412,7 @@ celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { return status; } -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h index 475416a..c79e88c 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h @@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, celix_properties_t *subEP); +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, celix_properties_t *subEP); array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c index 008dff5..3eb887b 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c @@ -16,171 +16,142 @@ *specific language governing permissions and limitations *under the License. */ -/* - * psa_activator.c + +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at * - * \date Sep 30, 2011 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. */ + #include <stdlib.h> -#include "bundle_activator.h" -#include "service_tracker.h" +#include "celix_api.h" +#include "pubsub_serializer.h" #include "pubsub_admin_impl.h" +typedef struct psa_zmq_activator { + pubsub_admin_t *admin; -struct activator { - pubsub_admin_pt admin; - pubsub_admin_service_pt adminService; - service_registration_pt registration; - service_tracker_pt serializerTracker; -}; + pubsub_admin_service_t adminService; + long adminSvcId; + command_service_t cmdSvc; + long cmdSvcId; -static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { - struct activator *act= (struct activator*)handle; - if (act->admin->externalPublications && !hashMap_isEmpty(act->admin->externalPublications)) { - fprintf(outStream, "External Publications:\n"); - for(hash_map_iterator_t iter = hashMapIterator_construct(act->admin->externalPublications); hashMapIterator_hasNext(&iter);) { - const char* key = (const char*)hashMapIterator_nextKey(&iter); - fprintf(outStream, " %s\n", key); - } - } - if (act->admin->localPublications && !hashMap_isEmpty(act->admin->localPublications)) { - fprintf(outStream, "Local Publications:\n"); - for (hash_map_iterator_t iter = hashMapIterator_construct( - act->admin->localPublications); hashMapIterator_hasNext(&iter);) { - const char *key = (const char *) hashMapIterator_nextKey(&iter); - fprintf(outStream, " %s\n", key); - } - } - if (act->admin->subscriptions && !hashMap_isEmpty(act->admin->subscriptions)) { - fprintf(outStream, "Active Subscriptions:\n"); - for (hash_map_iterator_t iter = hashMapIterator_construct( - act->admin->subscriptions); hashMapIterator_hasNext(&iter);) { - const char *key = (const char *) hashMapIterator_nextKey(&iter); - fprintf(outStream, " %s\n", key); - } - } - if (act->admin->pendingSubscriptions && !hashMap_isEmpty(act->admin->pendingSubscriptions)) { - fprintf(outStream, "Pending Subscriptions:\n"); - for (hash_map_iterator_t iter = hashMapIterator_construct( - act->admin->pendingSubscriptions); hashMapIterator_hasNext(&iter);) { - const char *key = (const char *) hashMapIterator_nextKey(&iter); - fprintf(outStream, " %s\n", key); - } - } - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; + long serializerTrackerId; +} psa_zmq_activator_t; - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; +static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { + psa_zmq_activator_t *act= handle; + if (act->admin->externalPublications && !hashMap_isEmpty(act->admin->externalPublications)) { + fprintf(outStream, "External Publications:\n"); + for(hash_map_iterator_t iter = hashMapIterator_construct(act->admin->externalPublications); hashMapIterator_hasNext(&iter);) { + const char* key = (const char*)hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } } - else{ - *userData = activator; - - status = pubsubAdmin_create(context, &(activator->admin)); - - if(status == CELIX_SUCCESS){ - service_tracker_customizer_pt customizer = NULL; - status = serviceTrackerCustomizer_create(activator->admin, - NULL, - pubsubAdmin_serializerAdded, - NULL, - pubsubAdmin_serializerRemoved, - &customizer); - if(status == CELIX_SUCCESS){ - status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); - if (status == CELIX_SUCCESS) { - properties_pt shellProps = properties_create(); - properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_zmq_info"); - properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_zmq_info"); - properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "psa_zmq_info: Overview of PubSub ZMQ Admin"); - activator->admin->shellCmdService.handle = activator; - activator->admin->shellCmdService.executeCommand = shellCommand; - bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &activator->admin->shellCmdService, shellProps, &activator->admin->shellCmdReg); - } else { - serviceTrackerCustomizer_destroy(customizer); - pubsubAdmin_destroy(activator->admin); - } - } - else{ - pubsubAdmin_destroy(activator->admin); - } + if (act->admin->localPublications && !hashMap_isEmpty(act->admin->localPublications)) { + fprintf(outStream, "Local Publications:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->localPublications); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); } } - return status; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc)); - - if (!pubsubAdminSvc) { - status = CELIX_ENOMEM; + if (act->admin->subscriptions && !hashMap_isEmpty(act->admin->subscriptions)) { + fprintf(outStream, "Active Subscriptions:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->subscriptions); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } } - else{ - pubsubAdminSvc->admin = activator->admin; - - pubsubAdminSvc->addPublication = pubsubAdmin_addPublication; - pubsubAdminSvc->removePublication = pubsubAdmin_removePublication; - - pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription; - pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription; - - pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; - pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; - - pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; + if (act->admin->pendingSubscriptions && !hashMap_isEmpty(act->admin->pendingSubscriptions)) { + fprintf(outStream, "Pending Subscriptions:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->pendingSubscriptions); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } + } + return CELIX_SUCCESS; +} - activator->adminService = pubsubAdminSvc; +int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) { + act->adminSvcId = -1L; + act->serializerTrackerId = -1l; - status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); + celix_status_t status = pubsubAdmin_create(ctx, &(act->admin)); - status += serviceTracker_open(activator->serializerTracker); + //track pubsub serializers + if (status == CELIX_SUCCESS) { + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = act->admin; + opts.addWithProperties = pubsubAdmin_addSerializer; + opts.removeWithProperties = pubsubAdmin_removeSerializer; + act->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); } + //register psa service + if (status == CELIX_SUCCESS) { + pubsub_admin_service_t *psaSvc = &act->adminService; + psaSvc->handle = act->admin; + psaSvc->matchPublisher = pubsubAdmin_matchPublisher; + psaSvc->matchSubscriber = pubsubAdmin_matchSubscriber; + psaSvc->matchEndpoint = pubsubAdmin_matchEndpoint; + psaSvc->setupTopicSender = pubsubAdmin_setupTopicSender; + psaSvc->teardownTopicSender = pubsubAdmin_teardownTopicSender; + psaSvc->setupTopicReciever = pubsubAdmin_setupTopicReciever; + psaSvc->teardownTopicReciever = pubsubAdmin_teardownTopicReciever; + psaSvc->addEndpoint = pubsubAdmin_addEndpoint; + psaSvc->removeEndpoint = pubsubAdmin_removeEndpoint; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_PSA_ZMQ_PSA_TYPE); + + act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props); + } - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - status += serviceTracker_close(activator->serializerTracker); - serviceRegistration_unregister(activator->admin->shellCmdReg); - activator->admin->shellCmdReg = NULL; - status += serviceRegistration_unregister(activator->registration); + //register psa shell command + if (status == CELIX_SUCCESS) { + act->cmdSvc.handle = act; + act->cmdSvc.executeCommand = shellCommand; - activator->registration = NULL; + celix_properties_t *shellProps = celix_properties_create(); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_zmq_info"); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_zmq_info"); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "psa_zmq_info: Overview of PubSub ZMQ Admin"); - free(activator->adminService); - activator->adminService = NULL; + act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps); + } return status; } -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceTracker_destroy(activator->serializerTracker); - pubsubAdmin_destroy(activator->admin); - activator->admin = NULL; - free(activator); - - return status; +int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->adminSvcId); + celix_bundleContext_unregisterService(ctx, act->cmdSvcId); + celix_bundleContext_stopTracker(ctx, act->serializerTrackerId); + pubsubAdmin_destroy(act->admin); + return CELIX_SUCCESS; } - +CELIX_GEN_BUNDLE_ACTIVATOR(psa_zmq_activator_t, psa_zmq_start, psa_zmq_stop); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c index 0af73e4..34cc6f9 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c @@ -419,83 +419,107 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint return pubsubAdmin_addAnySubscription(admin,subEP); } - /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - celixThreadMutex_lock(&admin->subscriptionsLock); - celixThreadMutex_lock(&admin->localPublicationsLock); - celixThreadMutex_lock(&admin->externalPublicationsLock); - - char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + pubsub_serializer_service_t *best_serializer = NULL; + const char *serType = NULL; + status = pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType); - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic - pubsubAdmin_addSubscriptionToPendingList(admin,subEP); + if (status == CELIX_SUCCESS) { + //admin type and ser type now known -> update ep + pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_ZMQ_PUBSUB_ADMIN_TYPE); + pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType); + } else { + printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", + properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } - else{ - int i; - topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); - if(subscription == NULL) { - pubsub_serializer_service_t *best_serializer = NULL; - const char *serType = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){ - status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription); - } - else{ - printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } + char *scope_topic = NULL; + if (status == CELIX_SUCCESS) { + /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + celixThreadMutex_lock(&admin->subscriptionsLock); + celixThreadMutex_lock(&admin->localPublicationsLock); + celixThreadMutex_lock(&admin->externalPublicationsLock); - if (status==CELIX_SUCCESS){ + scope_topic = pubsubEndpoint_createScopeTopicKey( + properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - /* Try to connect internal publishers */ - if(factory!=NULL){ - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); - if(topic_publishers!=NULL){ - for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)); + if (factory == NULL && ext_pub_list == NULL) { //No (local or external) publishers yet for this topic + pubsubAdmin_addSubscriptionToPendingList(admin, subEP); + } else { + int i; + topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); + + if (subscription == NULL) { + status += pubsub_topicSubscriptionCreate(admin->bundle_context, + (char *) properties_get(subEP->properties, + PUBSUB_ENDPOINT_TOPIC_SCOPE), + (char *) properties_get(subEP->properties, + PUBSUB_ENDPOINT_TOPIC_NAME), + best_serializer, serType, &subscription); + + if (status == CELIX_SUCCESS) { + //got type and serializer -> update endpoint + pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_ZMQ_PUBSUB_ADMIN_TYPE); + pubsubEndpoint_setField(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType); + + /* Try to connect internal publishers */ + if (factory != NULL) { + topic_publication_pt topic_pubs = (topic_publication_pt) factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if (topic_publishers != NULL) { + for (i = 0; i < arrayList_size(topic_publishers); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(topic_publishers, i); + if (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) { + status += pubsub_topicSubscriptionConnectPublisher(subscription, + (char *) properties_get( + pubEP->properties, + PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)); + } } + arrayList_destroy(topic_publishers); } - arrayList_destroy(topic_publishers); - } - } + } - /* Look also for external publishers */ - if(ext_pub_list!=NULL){ - for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)); + /* Look also for external publishers */ + if (ext_pub_list != NULL) { + for (i = 0; i < arrayList_size(ext_pub_list); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(ext_pub_list, i); + if (properties_get(pubEP->properties, PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY) != NULL) { + status += pubsub_topicSubscriptionConnectPublisher(subscription, + (char *) properties_get( + pubEP->properties, + PUBSUB_PSA_ZMQ_ENDPOINT_URL_KEY)); + } } } - } - pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + pubsub_topicSubscriptionAddSubscriber(subscription, subEP); - status += pubsub_topicSubscriptionStart(subscription); + status += pubsub_topicSubscriptionStart(subscription); - } + } - if(status==CELIX_SUCCESS){ + if (status == CELIX_SUCCESS) { - hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + hashMap_put(admin->subscriptions, strdup(scope_topic), subscription); - connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); + connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); + } } - } - if (status == CELIX_SUCCESS){ - pubsub_topicIncreaseNrSubscribers(subscription); + if (status == CELIX_SUCCESS) { + pubsub_topicIncreaseNrSubscribers(subscription); + } } } http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h index 4569a97..e8f80b1 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h @@ -39,11 +39,13 @@ #include "pubsub_psa_zmq_constants.h" #include "pubsub_admin.h" -#include "pubsub_admin_match.h" +#include "pubsub_utils.h" #include "log_helper.h" #include "command.h" -struct pubsub_admin { +#define PUBSUB_PSA_ZMQ_PSA_TYPE "zmq" + +typedef struct pubsub_admin { bundle_context_pt bundle_context; log_helper_pt loghelper; @@ -82,31 +84,37 @@ struct pubsub_admin { unsigned int basePort; unsigned int maxPort; - command_service_t shellCmdService; - service_registration_pt shellCmdReg; - double qosSampleScore; double qosControlScore; double defaultScore; bool verbose; -}; +} pubsub_admin_t; + +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin); +celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin); + +void pubsubAdmin_addSerializer(void * handle, void *svc, const celix_properties_t *properties); +void pubsubAdmin_removeSerializer(void * handle, void *svc, const celix_properties_t *properties); + + -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); +//for the pubsub_admin_service -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); +celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); +celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); +celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score); -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); +//note endpoint is owned by caller +celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); +celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic); -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic); -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic); +//note endpoint is owned by caller +celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); +celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic); -celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service); +celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint); +celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint); -celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); #endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c index 35da907..07f4466 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c @@ -113,7 +113,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY @@ -279,7 +279,7 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top /* Let's register the new service */ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); + celix_properties_t *pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); if(pubEP!=NULL){ service_factory_pt factory = calloc(1, sizeof(*factory)); @@ -315,7 +315,7 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ return serviceRegistration_unregister(pub->svcFactoryReg); } -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) { +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep) { celixThreadMutex_lock(&(pub->tp_lock)); pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE); @@ -327,11 +327,11 @@ celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, p return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,celix_properties_t *ep){ celixThreadMutex_lock(&(pub->tp_lock)); for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) { - pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i); + celix_properties_t *e = arrayList_get(pub->pub_ep_list, i); if(pubsubEndpoint_equals(ep, e)) { arrayList_removeElement(pub->pub_ep_list,ep); break; @@ -588,7 +588,7 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to arrayList_create(&bound->mp_parts); - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); + celix_properties_t *pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); bound->topic=strdup(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); bound->service.handle = bound; http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h index 20e4a8e..0fab6d7 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h +++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h @@ -35,11 +35,11 @@ typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, celix_properties_t *endpoint, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, celix_properties_t *ep); +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub, celix_properties_t *ep); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h index 6dca4e5..03dfd9e6a 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h +++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h @@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, celix_properties_t *subEP); +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, celix_properties_t *subEP); array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index fcf0823..98d6be6 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -79,6 +79,7 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove (*ps_discovery)->ttlForEntries = (int)ttl; (*ps_discovery)->sleepInsecBetweenTTLRefresh = (int)(((float)ttl)/2.0); (*ps_discovery)->pubsubPath = celix_bundleContext_getProperty(context, PUBSUB_DISCOVERY_SERVER_PATH_KEY, PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT); + (*ps_discovery)->fwUUID = celix_bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); return status; } @@ -111,39 +112,58 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) { return status; } -void* psd_watch(void *data) { - pubsub_discovery_t *disc = data; - celixThreadMutex_lock(&disc->runningMutex); - bool running = disc->running; - celixThreadMutex_unlock(&disc->runningMutex); +static void psd_etcdReadCallback(const char *key __attribute__((unused)), const char *value, void* arg) { + pubsub_discovery_t *disc = arg; + celix_properties_t *props = pubsub_discovery_parseEndpoint(value); + if (props != NULL) { + pubsub_discovery_addDiscoveredEndpoint(disc, props); + } +} - long long prevIndex = 0L; +static void psd_watchSetupConnection(pubsub_discovery_t *disc, bool *connectedPtr, long long *mIndex) { + bool connected = *connectedPtr; + if (!connected) { + if (disc->verbose) { + printf("[PSD] Reading etcd directory at %s\n", disc->pubsubPath); + } + int rc = etcd_get_directory(disc->pubsubPath, psd_etcdReadCallback, disc, mIndex); + if (rc == ETCDLIB_RC_OK) { + *connectedPtr = true; + } else { + *connectedPtr = false; + } + } +} + +static void psd_watchForChange(pubsub_discovery_t *disc, bool *connectedPtr, long long *mIndex) { + bool connected = *connectedPtr; + if (connected) { + long long watchIndex = *mIndex + 1; - while (running) { char *action = NULL; char *value = NULL; char *readKey = NULL; - long long mIndex; //TODO add interruptable etcd_wait -> which returns a handle to interrupt and a can be used for a wait call - int rc = etcd_watch(disc->pubsubPath, prevIndex, &action, NULL, &value, &readKey, &mIndex); + int rc = etcd_watch(disc->pubsubPath, watchIndex, &action, NULL, &value, &readKey, mIndex); if (rc == ETCDLIB_RC_TIMEOUT) { //nop - } else if (rc == ETCDLIB_RC_ERROR) { - printf("WARNING PSD: Error communicating with etcd.\n"); + } else if (rc == ETCDLIB_RC_ERROR || action == NULL) { + printf("[PSD] Error communicating with etcd. rc is %i, action value is %s\n", rc, action); + *connectedPtr = false; } else { if (strncmp(ETCDLIB_ACTION_CREATE, action, strlen(ETCDLIB_ACTION_CREATE)) == 0 || - strncmp(ETCDLIB_ACTION_SET, action, strlen(ETCDLIB_ACTION_SET)) == 0 || - strncmp(ETCDLIB_ACTION_UPDATE, action, strlen(ETCDLIB_ACTION_UPDATE)) == 0) { + strncmp(ETCDLIB_ACTION_SET, action, strlen(ETCDLIB_ACTION_SET)) == 0 || + strncmp(ETCDLIB_ACTION_UPDATE, action, strlen(ETCDLIB_ACTION_UPDATE)) == 0) { celix_properties_t *props = pubsub_discovery_parseEndpoint(value); if (props != NULL) { pubsub_discovery_addDiscoveredEndpoint(disc, props); } } else if (strncmp(ETCDLIB_ACTION_DELETE, action, strlen(ETCDLIB_ACTION_DELETE)) == 0 || - strncmp(ETCDLIB_ACTION_EXPIRE, action, strlen(ETCDLIB_ACTION_EXPIRE)) == 0) { - celix_properties_t *props = pubsub_discovery_parseEndpoint(value); - if (props != NULL) { - const char *uuid = celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL); + strncmp(ETCDLIB_ACTION_EXPIRE, action, strlen(ETCDLIB_ACTION_EXPIRE)) == 0) { + char *uuid = strrchr(readKey, '/'); + if (uuid != NULL) { + uuid = uuid + 1; pubsub_discovery_removeDiscoveredEndpoint(disc, uuid); } } else { @@ -153,9 +173,61 @@ void* psd_watch(void *data) { free(action); free(value); free(readKey); - prevIndex = mIndex; } + } else { + if (disc->verbose) { + printf("[PSD] Skipping etcd watch -> not connected\n"); + } + } +} + +static void psd_cleanupIfDisconnected(pubsub_discovery_t *disc, bool *connectedPtr) { + bool connected = *connectedPtr; + if (!connected) { + + celixThreadMutex_lock(&disc->discoveredEndpointsMutex); + int size = hashMap_size(disc->discoveredEndpoints); + if (disc->verbose) { + printf("[PSD] Removing all discovered entries (%i) -> not connected\n", size); + } + + hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints); + while (hashMapIterator_hasNext(&iter)) { + celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); + + celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex); + hash_map_iterator_t iter2 = hashMapIterator_construct(disc->discoveredEndpointsListeners); + while (hashMapIterator_hasNext(&iter2)) { + pubsub_discovered_endpoint_listener_t *listener = hashMapIterator_nextValue(&iter2); + listener->removeDiscoveredEndpoint(listener->handle, endpoint); + } + celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex); + + celix_properties_destroy(endpoint); + } + hashMap_clear(disc->discoveredEndpoints, false, false); + celixThreadMutex_unlock(&disc->discoveredEndpointsMutex); + } +} + +void* psd_watch(void *data) { + pubsub_discovery_t *disc = data; + + long long mIndex = 0L; + bool connected = false; + celixThreadMutex_lock(&disc->runningMutex); + bool running = disc->running; + celixThreadMutex_unlock(&disc->runningMutex); + + while (running) { + psd_watchSetupConnection(disc, &connected, &mIndex); + psd_watchForChange(disc, &connected, &mIndex); + psd_cleanupIfDisconnected(disc, &connected); + + if (!connected) { + usleep(5000000); //if not connected wait a few seconds + } celixThreadMutex_lock(&disc->runningMutex); running = disc->running; @@ -173,8 +245,8 @@ void* psd_refresh(void *data) { celixThreadMutex_unlock(&disc->runningMutex); while (running) { - struct timeval start; - gettimeofday(&start, NULL); + struct timespec start; + clock_gettime(CLOCK_REALTIME, &start); celixThreadMutex_lock(&disc->announcedEndpointsMutex); hash_map_iterator_t iter = hashMapIterator_construct(disc->announcedEndpoints); @@ -199,21 +271,11 @@ void* psd_refresh(void *data) { } celixThreadMutex_unlock(&disc->announcedEndpointsMutex); - struct timeval end; - gettimeofday(&end, NULL); - - double s = start.tv_sec + (start.tv_usec / 1000.0 / 1000.0 ); - double e = end.tv_sec + (end.tv_usec / 1000.0 / 1000.0 ); - double elapsedInsec = e - s; - double sleepNeededInSec = disc->sleepInsecBetweenTTLRefresh - elapsedInsec; - if (sleepNeededInSec > 0) { - celixThreadMutex_lock(&disc->waitMutex); - double waitTill = sleepNeededInSec + end.tv_sec + (end.tv_usec / 1000.0 / 1000.0); - long sec = (long)waitTill; - long nsec = (long)((waitTill - sec) * 1000 * 1000 * 1000); - celixThreadCondition_timedwait(&disc->waitCond, &disc->waitMutex, sec, nsec); - celixThreadMutex_unlock(&disc->waitMutex); - } + struct timespec waitTill = start; + waitTill.tv_sec += disc->sleepInsecBetweenTTLRefresh; + celixThreadMutex_lock(&disc->waitMutex); + pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread + celixThreadMutex_unlock(&disc->waitMutex); celixThreadMutex_lock(&disc->runningMutex); running = disc->running; @@ -226,7 +288,10 @@ celix_status_t pubsub_discovery_start(pubsub_discovery_t *ps_discovery) { celix_status_t status = CELIX_SUCCESS; celixThread_create(&ps_discovery->watchThread, NULL, psd_watch, ps_discovery); + celixThread_setName(&ps_discovery->watchThread, "PubSub ETCD Watch"); celixThread_create(&ps_discovery->refreshTTLThread, NULL, psd_refresh, ps_discovery); + celixThread_setName(&ps_discovery->refreshTTLThread, "PubSub ETCD Refresh TTL"); + return status; } @@ -244,7 +309,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_t *disc) { celixThread_join(disc->watchThread, NULL); celixThread_join(disc->refreshTTLThread, NULL); - //TODO NOTE double lock , check if this is always done in the same order celixThreadMutex_lock(&disc->discoveredEndpointsMutex); hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpoints); while (hashMapIterator_hasNext(&iter)) { @@ -367,7 +431,27 @@ celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_propert static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint) { const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); - assert(uuid != NULL); //note endpoint should already be check to be valid pubsubEndpoint_isValid + const char *fwUUID = celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL); + + //note endpoint should already be check to be valid pubsubEndpoint_isValid + assert(uuid != NULL); + assert(fwUUID != NULL); + + if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) == 0) { + if (disc->verbose) { + printf("[PSD] Ignoring endpoint %s from own framework\n", uuid); + } + return; + } + + if (disc->verbose) { + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); + const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); + printf("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n", + uuid, type, admin, ser); + } celixThreadMutex_lock(&disc->discoveredEndpointsMutex); bool exists = hashMap_containsKey(disc->discoveredEndpoints, (void*)uuid); @@ -383,7 +467,7 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel } celixThreadMutex_unlock(&disc->discoveredEndpointsListenersMutex); } else { - fprintf(stderr, "[PSD] Warning unexpected update from an already existing endpoint (uuid is %s)\n", uuid); + //assuming this is the same endpoint -> ignore } } @@ -393,6 +477,20 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, celix_properties_t *endpoint = hashMap_remove(disc->discoveredEndpoints, (void*)uuid); celixThreadMutex_unlock(&disc->discoveredEndpointsMutex); + if (endpoint == NULL) { + //NOTE assuming this was a endpoint from this framework -> ignore + return; + } + + if (disc->verbose) { + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); + const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); + printf("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n", + uuid, type, admin, ser); + } + if (exists && endpoint != NULL) { celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex); hash_map_iterator_t iter = hashMapIterator_construct(disc->discoveredEndpointsListeners); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h index 11663cb..f0a5b22 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h @@ -40,20 +40,21 @@ #define PUBSUB_DISCOVERY_SERVER_IP_DEFAULT "127.0.0.1" #define PUBSUB_DISCOVERY_SERVER_PORT_DEFAULT 2379 -#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT "pubsub/discovery" +#define PUBSUB_DISCOVERY_SERVER_PATH_DEFAULT "pubsub/" #define PUBSUB_DISCOVERY_ETCD_TTL_DEFAULT 30 typedef struct pubsub_discovery { bundle_context_pt context; + //TODO add logHelper - celix_thread_mutex_t discoveredEndpointsMutex; + celix_thread_mutex_t discoveredEndpointsMutex; //when locked with EndpointsListenersMutex -> first lock this hash_map_pt discoveredEndpoints; //<key = uuid,celix_properties_t /*endpoint*/>> celix_thread_mutex_t announcedEndpointsMutex; hash_map_pt announcedEndpoints; //<key = char* (etcd key),pubsub_announce_entry_t /*endpoint*/>> celix_thread_mutex_t discoveredEndpointsListenersMutex; - hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discover_listener_t + hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t celix_thread_mutex_t waitMutex; celix_thread_cond_t waitCond; @@ -69,6 +70,7 @@ typedef struct pubsub_discovery { bool verbose; int ttlForEntries; int sleepInsecBetweenTTLRefresh; + const char *fwUUID; } pubsub_discovery_t; typedef struct pubsub_announce_entry { @@ -83,15 +85,10 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *node_discovery); celix_status_t pubsub_discovery_start(pubsub_discovery_t *node_discovery); celix_status_t pubsub_discovery_stop(pubsub_discovery_t *node_discovery); -celix_status_t pubsub_discovery_addNode(pubsub_discovery_t *node_discovery, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_discovery_removeNode(pubsub_discovery_t *node_discovery, pubsub_endpoint_pt pubEP); - void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint); celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint); -celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_t *discovery, pubsub_endpoint_pt endpoint, bool endpointAdded); - #endif /* PUBSUB_DISCOVERY_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt index ce56037..010e864 100644 --- a/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt +++ b/bundles/pubsub/pubsub_serializer_json/CMakeLists.txt @@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_serializer_json VERSION "1.0.0" GROUP "Celix/PubSub" SOURCES - src/ps_activator.c + src/ps_json_serializer_activator.c src/pubsub_serializer_impl.c ) target_include_directories(celix_pubsub_serializer_json PRIVATE http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c deleted file mode 100644 index 32dd1fc..0000000 --- a/bundles/pubsub/pubsub_serializer_json/src/ps_activator.c +++ /dev/null @@ -1,108 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * ps_activator.c - * - * \date Mar 24, 2017 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdlib.h> - -#include "bundle_activator.h" -#include "service_registration.h" -#include "pubsub_constants.h" - -#include "pubsub_serializer_impl.h" - -struct activator { - pubsub_serializer_t* serializer; - pubsub_serializer_service_t* serializerService; - service_registration_pt registration; -}; - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; - - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; - } - else{ - *userData = activator; - status = pubsubSerializer_create(context, &(activator->serializer)); - } - - return status; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc)); - - if (!pubsubSerializerSvc) { - status = CELIX_ENOMEM; - } - else{ - pubsubSerializerSvc->handle = activator->serializer; - - pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap; - pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap; - activator->serializerService = pubsubSerializerSvc; - - /* Set serializer type */ - properties_pt props = properties_create(); - properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_SERIALIZER_TYPE); - - status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration); - - } - - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceRegistration_unregister(activator->registration); - activator->registration = NULL; - - free(activator->serializerService); - activator->serializerService = NULL; - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - pubsubSerializer_destroy(activator->serializer); - activator->serializer = NULL; - - free(activator); - - return status; -} - - http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c new file mode 100644 index 0000000..b74bab8 --- /dev/null +++ b/bundles/pubsub/pubsub_serializer_json/src/ps_json_serializer_activator.c @@ -0,0 +1,59 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdlib.h> +#include <pubsub_constants.h> + +#include "celix_api.h" +#include "pubsub_serializer_impl.h" + +typedef struct psjs_activator { + pubsub_serializer_t* serializer; + ; + pubsub_serializer_service_t serializerSvc; + long serializerSvcId; +} psjs_activator_t; + +static int psjs_start(psjs_activator_t *act, celix_bundle_context_t *ctx) { + act->serializerSvcId = -1L; + + celix_status_t status = pubsubSerializer_create(ctx, &(act->serializer)); + if (status == CELIX_SUCCESS) { + act->serializerSvc.handle = act->serializer; + + act->serializerSvc.createSerializerMap = (void*)pubsubSerializer_createSerializerMap; + act->serializerSvc.destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap; + + /* Set serializer type */ + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_SERIALIZER_TYPE_KEY, PUBSUB_JSON_SERIALIZER_TYPE); + + act->serializerSvcId = celix_bundleContext_registerService(ctx, &act->serializerSvc, PUBSUB_SERIALIZER_SERVICE_NAME, props); + } + return status; +} + +static int psjs_stop(psjs_activator_t *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->serializerSvcId); + act->serializerSvcId = -1L; + pubsubSerializer_destroy(act->serializer); + return CELIX_SUCCESS; +} + +CELIX_GEN_BUNDLE_ACTIVATOR(psjs_activator_t, psjs_start, psjs_stop) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h index c36f20e..75b72cb 100644 --- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h +++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h @@ -16,13 +16,6 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_serializer_impl.h - * - * \date Mar 24, 2017 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef PUBSUB_SERIALIZER_JSON_H_ #define PUBSUB_SERIALIZER_JSON_H_ @@ -34,7 +27,7 @@ #include "pubsub_serializer.h" -#define PUBSUB_SERIALIZER_TYPE "json" +#define PUBSUB_JSON_SERIALIZER_TYPE "json" typedef struct pubsub_serializer { bundle_context_pt bundle_context; http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt index c565660..7c27ed6 100644 --- a/bundles/pubsub/pubsub_spi/CMakeLists.txt +++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt @@ -16,7 +16,7 @@ # under the License. add_library(pubsub_spi STATIC - src/pubsub_admin_match.c + src/pubsub_utils_match.c src/pubsub_endpoint.c src/pubsub_utils.c ) http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h index 5379415..8c15fcf 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h @@ -27,44 +27,41 @@ #ifndef PUBSUB_ADMIN_H_ #define PUBSUB_ADMIN_H_ -#include "service_reference.h" +#include "celix_properties.h" +#include "celix_bundle.h" +#include "celix_filter.h" -#include "pubsub_common.h" -#include "pubsub_endpoint.h" +#define PUBSUB_ADMIN_SERVICE_NAME "pubsub_admin" +#define PUBSUB_ADMIN_SERVICE_VERSION "2.0.0" +#define PUBSUB_ADMIN_SERVICE_RANGE "[2,3)" -#include "pubsub_constants.h" +//expected service properties +#define PUBSUB_ADMIN_SERVICE_TYPE "psa_type" - -typedef struct pubsub_admin *pubsub_admin_pt; +#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F +#define PUBSUB_ADMIN_NO_MATCH_SCORE 0.0F struct pubsub_admin_service { - pubsub_admin_pt admin; - - celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - - celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + void *handle; - celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); - celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); + celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); + celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); + celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, double *score); - //TODO add match function for subscription service and publication listeners, e.g.: - //matchPublisherListener(admin, bundle, filter, outScore) - //matchSubscriberService(admin, svcRef, outScore) + //note endpoint is owned by caller + celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); + celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic); - /* Match principle: - * - A full matching pubsub_admin gives 100 points - */ - //TODO this should only be called for remote endpoints (e.g. not endpoints from this framework - celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); + //note endpoint is owned by caller + celix_status_t (*setupTopicReciever)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); + celix_status_t (*teardownTopicReciever)(void *handle, const char *scope, const char *topic); - //TODO redesign add function for handling endpoint seperate, e.g.: - //addEndpoint(admin, endpoint); - //note that endpoints can be subscribers and publishers - //Also note that we than can have pending subscribers and pending (subscriber/publisher) endpoints. + celix_status_t (*addEndpoint)(void *handle, const celix_properties_t *endpoint); + celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *endpoint); }; -typedef struct pubsub_admin_service *pubsub_admin_service_pt; +typedef struct pubsub_admin_service pubsub_admin_service_t; #endif /* PUBSUB_ADMIN_H_ */ + + http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h deleted file mode 100644 index 08d6582..0000000 --- a/bundles/pubsub/pubsub_spi/include/pubsub_admin_match.h +++ /dev/null @@ -1,47 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - - -#ifndef PUBSUB_ADMIN_MATCH_H_ -#define PUBSUB_ADMIN_MATCH_H_ - -#include "celix_errno.h" -#include "properties.h" -#include "array_list.h" - -#include "pubsub_serializer.h" - -#define QOS_ATTRIBUTE_KEY "attribute.qos" -#define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ -#define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ - -#define PUBSUB_ADMIN_FULL_MATCH_SCORE 100.0F - -celix_status_t pubsub_admin_match( - pubsub_endpoint_pt endpoint, - const char *pubsub_admin_type, - const char *frameworkUuid, - double sampleScore, - double controlScore, - double defaultScore, - array_list_pt serializerList, - double *score); -celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out); - -#endif /* PUBSUB_ADMIN_MATCH_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h index cfed5d9..e551031 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h @@ -27,13 +27,9 @@ #ifndef PUBSUB_COMMON_H_ #define PUBSUB_COMMON_H_ -#define PUBSUB_SERIALIZER_SERVICE "pubsub_serializer" -#define PUBSUB_ADMIN_SERVICE "pubsub_admin" #define PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE "pubsub_announce_endpoint_listener" #define PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE "pubsub_discovered_endpoint_listener" -#define PUBSUB_ANY_SUB_TOPIC "any" - #define MAX_SCOPE_LEN 1024 #define MAX_TOPIC_LEN 1024 http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_constants.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h index 47e31d3..be47318 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h @@ -20,10 +20,6 @@ #ifndef PUBSUB_CONSTANTS_H_ #define PUBSUB_CONSTANTS_H_ -#define PSA_IP "PSA_IP" -#define PSA_ITF "PSA_INTERFACE" -#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" - #define PUBSUB_ADMIN_TYPE_KEY "pubsub.config" #define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer.type" http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h index 0d8c513..b03109d 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h @@ -43,33 +43,11 @@ #define PUBSUB_SUBSCRIBER_ENDPOINT_TYPE "pubsub.subscriber" -struct pubsub_endpoint { - const char *topicName; - const char *topicScope; +celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props); +celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx, long svcBndId, const celix_properties_t *svcProps); +celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context_t *ctx, long bundleId, const char *filter); - const char *uuid; - const char *frameworkUUid; - const char *type; - const char *adminType; - const char *serializerType; - - celix_properties_t *properties; -}; - -typedef struct pubsub_endpoint *pubsub_endpoint_pt; -typedef struct pubsub_endpoint pubsub_endpoint_t; - -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props, pubsub_endpoint_pt* psEp); -celix_status_t pubsubEndpoint_createFromProperties(const celix_properties_t *props, pubsub_endpoint_t **out); -celix_status_t pubsubEndpoint_createFromSvc(bundle_context_t* ctx, const celix_bundle_t *bnd, const celix_properties_t *svcProps, bool isPublisher, pubsub_endpoint_pt* out); -celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, const celix_service_tracker_info_t *info, bool isPublisher, pubsub_endpoint_pt* out); -celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); - -void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); -bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); -bool pubsubEndpoint_equalsWithProperties(pubsub_endpoint_pt psEp1, const celix_properties_t *props); - -void pubsubEndpoint_setField(pubsub_endpoint_t *ep, const char *key, const char *val); +bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properties_t *psEp2); //check if the required properties are available for the endpoint bool pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h index a91e820..9fa3340 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h @@ -16,21 +16,13 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_serializer.h - * - * \date Mar 24, 2017 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef PUBSUB_SERIALIZER_SERVICE_H_ #define PUBSUB_SERIALIZER_SERVICE_H_ -#include "service_reference.h" #include "hash_map.h" - -#include "pubsub_common.h" +#include "version.h" +#include "celix_bundle.h" /** * There should be a pubsub_serializer_t @@ -41,6 +33,10 @@ * the extender pattern. */ +#define PUBSUB_SERIALIZER_SERVICE_NAME "pubsub_serializer" +#define PUBSUB_SERIALIZER_SERVICE_VERSION "1.0.0" +#define PUBSUB_SERIALIZER_SERVICE_RANGE "[1,2)" + typedef struct pubsub_msg_serializer { void* handle; unsigned int msgId; @@ -56,7 +52,7 @@ typedef struct pubsub_msg_serializer { typedef struct pubsub_serializer_service { void* handle; - celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap); + celix_status_t (*createSerializerMap)(void* handle, celix_bundle_t *bundle, hash_map_pt* serializerMap); celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap); } pubsub_serializer_service_t; http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_spi/include/pubsub_utils.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h index 1c92a9b..14f9bb0 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h @@ -16,19 +16,18 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_utils.h - * - * \date Sep 24, 2015 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef PUBSUB_UTILS_H_ #define PUBSUB_UTILS_H_ #include "bundle_context.h" -#include "array_list.h" +#include "celix_array_list.h" +#include "celix_bundle_context.h" + +#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY "qos" +#define PUBSUB_UTILS_QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ +#define PUBSUB_UTILS_QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ + /** * Returns the pubsub info from the provided filter. A pubsub filter should have a topic and can @@ -41,5 +40,37 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi char* pubsub_getKeysBundleDir(bundle_context_pt ctx); +double pubsub_utils_matchPublisher( + celix_bundle_context_t *ctx, + long bundleId, + const char *filter, + const char *adminType, + double sampleScore, + double controlScore, + double defaultScore, + long *outSerializerSvcId); + +double pubsub_utils_matchSubscriber( + celix_bundle_context_t *ctx, + long svcProviderBundleId, + const celix_properties_t *svcProperties, + const char *adminType, + double sampleScore, + double controlScore, + double defaultScore, + long *outSerializerSvcId); + +double pubsub_utils_matchEndpoint( + celix_bundle_context_t *ctx, + const celix_properties_t *endpoint, + const char *adminType, + double sampleScore, + double controlScore, + double defaultScore, + long *outSerializerSvcId); + + +celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher); + #endif /* PUBSUB_UTILS_H_ */