CELIX-454: More PubSub refactoring. Started creating a new skeleton for psa udpmc based on a updated pubsub spi.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/69596cfd Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/69596cfd Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/69596cfd Branch: refs/heads/feature/CELIX-454-pubsub-disc Commit: 69596cfdf93502eb2f723571394ab36537ea37ee Parents: e30a70f Author: Pepijn Noltes <pepijnnol...@gmail.com> Authored: Mon Sep 24 21:15:31 2018 +0200 Committer: Pepijn Noltes <pepijnnol...@gmail.com> Committed: Mon Sep 24 21:15:31 2018 +0200 ---------------------------------------------------------------------- bundles/pubsub/CMakeLists.txt | 2 + bundles/pubsub/examples/CMakeLists.txt | 40 +- .../private/include/pubsub_publisher_private.h | 13 +- .../publisher/private/src/ps_pub_activator.c | 108 +- .../publisher/private/src/pubsub_publisher.c | 14 +- .../pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 8 +- .../pubsub_admin_udp_mc/src/psa_activator.c | 155 +-- .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 318 +++-- .../pubsub_admin_udp_mc/src/pubsub_admin_impl.h | 37 +- .../src/pubsub_udpmc_admin.c | 335 +++++ .../src/pubsub_udpmc_admin.h | 92 ++ .../src/pubsub_udpmc_topic_sender.c | 99 ++ .../src/pubsub_udpmc_topic_sender.h | 39 + .../pubsub_admin_udp_mc/src/topic_publication.c | 30 +- .../pubsub_admin_udp_mc/src/topic_publication.h | 6 +- .../src/topic_subscription.c | 4 +- .../src/topic_subscription.h | 4 +- .../pubsub/pubsub_admin_zmq/src/psa_activator.c | 245 ++-- .../pubsub_admin_zmq/src/pubsub_admin_impl.c | 136 +- .../pubsub_admin_zmq/src/pubsub_admin_impl.h | 42 +- .../pubsub_admin_zmq/src/topic_publication.c | 12 +- .../pubsub_admin_zmq/src/topic_publication.h | 6 +- .../pubsub_admin_zmq/src/topic_subscription.h | 4 +- .../src/pubsub_discovery_impl.c | 174 ++- .../src/pubsub_discovery_impl.h | 13 +- .../pubsub_serializer_json/CMakeLists.txt | 2 +- .../pubsub_serializer_json/src/ps_activator.c | 108 -- .../src/ps_json_serializer_activator.c | 59 + .../src/pubsub_serializer_impl.h | 9 +- bundles/pubsub/pubsub_spi/CMakeLists.txt | 2 +- .../pubsub/pubsub_spi/include/pubsub_admin.h | 53 +- .../pubsub_spi/include/pubsub_admin_match.h | 47 - .../pubsub/pubsub_spi/include/pubsub_common.h | 4 - .../pubsub_spi/include/pubsub_constants.h | 4 - .../pubsub/pubsub_spi/include/pubsub_endpoint.h | 30 +- .../pubsub_spi/include/pubsub_serializer.h | 18 +- .../pubsub/pubsub_spi/include/pubsub_utils.h | 47 +- .../pubsub/pubsub_spi/src/pubsub_admin_match.c | 169 --- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 270 ++-- bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 31 + .../pubsub/pubsub_spi/src/pubsub_utils_match.c | 230 ++++ .../src/pstm_activator.c | 38 +- .../src/pubsub_topology_manager.c | 1185 +++++++++--------- .../src/pubsub_topology_manager.h | 81 +- libs/framework/include/celix_api.h | 24 +- libs/framework/include/celix_bundle_context.h | 11 + libs/framework/src/bundle_context.c | 14 + libs/framework/src/framework.c | 2 +- libs/utils/include/celix_properties.h | 6 +- libs/utils/include/celix_threads.h | 7 +- libs/utils/src/celix_threads.c | 18 +- libs/utils/src/properties.c | 36 +- 52 files changed, 2570 insertions(+), 1871 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt index e3db995..d7ae3f8 100644 --- a/bundles/pubsub/CMakeLists.txt +++ b/bundles/pubsub/CMakeLists.txt @@ -19,6 +19,8 @@ celix_subproject(PUBSUB "Option to build the pubsub bundles" OFF DEPS UTILS) if (PUBSUB) option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" OFF) + set(BUILD_PUBSUB_PSA_ZMQ OFF) + message(WARNING "TODO enable PSA_ZMQ again, for now disabled because refactoring is needed") if (BUILD_PUBSUB_PSA_ZMQ) message(WARNING "Celix will now contain a dependency with a LGPL License (ZeroMQ). For more information about this, consult the pubsub/README.md file.") option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF) http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt index 58fbd92..41a3406 100644 --- a/bundles/pubsub/examples/CMakeLists.txt +++ b/bundles/pubsub/examples/CMakeLists.txt @@ -29,8 +29,9 @@ set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} # UDP Multicast add_celix_container(pubsub_publisher_udp_mc - GROUP pubsub - BUNDLES + GROUP pubsub + BUNDLES + Celix::log_service Celix::shell Celix::shell_tui Celix::pubsub_serializer_json @@ -39,12 +40,17 @@ add_celix_container(pubsub_publisher_udp_mc Celix::pubsub_admin_udp_multicast celix_pubsub_poi_publisher celix_pubsub_poi_publisher2 - ) + PROPERTIES + PSA_UDPMC_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true +) target_link_libraries(pubsub_publisher_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS}) -add_celix_container("pubsub_subscriber_udp_mc" - GROUP "pubsub" - BUNDLES +add_celix_container(pubsub_subscriber_udp_mc + GROUP "pubsub" + BUNDLES + Celix::log_service Celix::shell Celix::shell_tui Celix::pubsub_serializer_json @@ -52,11 +58,18 @@ add_celix_container("pubsub_subscriber_udp_mc" Celix::pubsub_topology_manager Celix::pubsub_admin_udp_multicast celix_pubsub_poi_subscriber - ) + PROPERTIES + PSA_UDPMC_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true +) target_link_libraries(pubsub_subscriber_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS}) -add_celix_container("pubsub_subscriber2_udp_mc" - GROUP "pubsub" - BUNDLES + + +add_celix_container(pubsub_subscriber2_udp_mc + GROUP "pubsub" + BUNDLES + Celix::log_service Celix::shell Celix::shell_tui Celix::pubsub_serializer_json @@ -64,8 +77,13 @@ add_celix_container("pubsub_subscriber2_udp_mc" Celix::pubsub_topology_manager Celix::pubsub_admin_udp_multicast celix_pubsub_poi_subscriber - ) + PROPERTIES + PSA_UDPMC_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true +) target_link_libraries(pubsub_subscriber2_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS}) + if (ETCD_CMD AND XTERM_CMD) #Runtime starting a publish and subscriber for udp mc add_celix_runtime(pubsub_rt_upd_mc http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h index 56ec678..c27ad38 100644 --- a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h +++ b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h @@ -16,18 +16,11 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_publisher_private.h - * - * \date Sep 21, 2010 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef PUBSUB_PUBLISHER_PRIVATE_H_ #define PUBSUB_PUBLISHER_PRIVATE_H_ -#include <celixbool.h> +#include "celix_api.h" #include <pthread.h> #include "pubsub/publisher.h" @@ -53,8 +46,8 @@ void publisher_stop(pubsub_sender_pt client); void publisher_destroy(pubsub_sender_pt client); -celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service); +void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props); +void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props); #endif /* PUBSUB_PUBLISHER_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c index 0da3ffc..617163c 100644 --- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c +++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c @@ -16,21 +16,12 @@ *specific language governing permissions and limitations *under the License. */ -/* - * ps_pub_activator.c - * - * \date Sep 21, 2010 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #include <sys/cdefs.h> #include <stdlib.h> #include <string.h> -#include "bundle_activator.h" -#include "service_tracker.h" -#include "constants.h" +#include "celix_api.h" #include "pubsub/publisher.h" #include "pubsub_publisher_private.h" @@ -47,48 +38,27 @@ struct publisherActivator { array_list_pt trackerList;//List<service_tracker_pt> }; -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - - const char* fwUUID = NULL; - - bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ +static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx) { + const char *fwUUID = celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); + if (fwUUID==NULL) { printf("PUBLISHER: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } - struct publisherActivator * act = malloc(sizeof(*act)); - - bundle_pt bundle = NULL; - long bundleId = 0; - bundleContext_getBundle(context,&bundle); - bundle_getBundleId(bundle,&bundleId); - arrayList_create(&(act->trackerList)); - act->client = publisher_create(act->trackerList,fwUUID,bundleId); - *userData = act; + bundle_t *bnd = celix_bundleContext_getBundle(ctx); + long bundleId = celix_bundle_getId(bnd); - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - - struct publisherActivator * act = (struct publisherActivator *) userData; + act->trackerList = celix_arrayList_create(); + act->client = publisher_create(act->trackerList, fwUUID, bundleId); int i; char filter[128]; for(i=0; PUB_TOPICS[i] != NULL; i++){ const char* topic = PUB_TOPICS[i]; - - bundle_pt bundle = NULL; - long bundleId = 0; - bundleContext_getBundle(context,&bundle); - bundle_getBundleId(bundle,&bundleId); - - service_tracker_pt tracker = NULL; memset(filter,0,128); #ifdef USE_SCOPE - char *scope; + char *scope; asprintf(&scope, "my_scope_%d", i); snprintf(filter, 128, "(&(&(%s=%s)(%s=%s))(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, @@ -96,52 +66,36 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) PUBLISHER_SCOPE, scope); free(scope); #else - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, topic); + snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic); #endif - service_tracker_customizer_pt customizer = NULL; - serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer); - serviceTracker_createWithFilter(context, filter, customizer, &tracker); - - arrayList_add(act->trackerList,tracker); - + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.callbackHandle = act->client; + opts.addWithProperties = publisher_publishSvcAdded; + opts.removeWithProperties = publisher_publishSvcRemoved; + opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; + opts.filter.filter = filter; + opts.filter.ignoreServiceLanguage = true; + long trackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + + celix_arrayList_addLong(act->trackerList,trackerId); } publisher_start(act->client); - for(i=0;i<arrayList_size(act->trackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_open(tracker); - } - - return CELIX_SUCCESS; + return 0; } -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) { - struct publisherActivator * act = (struct publisherActivator *) userData; - int i; +static int pub_stop(struct publisherActivator *act, celix_bundle_context_t *ctx) { + for (int i=0; i < arrayList_size(act->trackerList); i++) { + long trkId = celix_arrayList_getLong(act->trackerList,i); + celix_bundleContext_stopTracker(ctx, trkId); + } + publisher_stop(act->client); - for(i=0;i<arrayList_size(act->trackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_close(tracker); - } - publisher_stop(act->client); + publisher_destroy(act->client); + celix_arrayList_destroy(act->trackerList); - return CELIX_SUCCESS; + return 0; } -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) { - struct publisherActivator * act = (struct publisherActivator *) userData; - int i; - - for(i=0;i<arrayList_size(act->trackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_destroy(tracker); - } - - publisher_destroy(act->client); - arrayList_destroy(act->trackerList); - - free(act); - printf("PUBLISHER: bundleActivator_destroy\n"); - return CELIX_SUCCESS; -} +CELIX_GEN_BUNDLE_ACTIVATOR(struct publisherActivator, pub_start, pub_stop) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c index 9a7aedc..5369a22 100644 --- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c +++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c @@ -134,28 +134,25 @@ void publisher_destroy(pubsub_sender_pt client) { free(client); } -celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){ - pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service; +void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props) { + pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc; pubsub_sender_pt manager = (pubsub_sender_pt)handle; printf("PUBLISHER: new publish service exported (%s).\n",manager->ident); send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct)); - const char *value = NULL; - serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value); data->service = publish_svc; data->publisher = manager; - data->topic = value; + data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!"); celix_thread_t *tid = malloc(sizeof(*tid)); stop=false; celixThread_create(tid,NULL,send_thread,(void*)data); hashMap_put(manager->tid_map, publish_svc, tid); - return CELIX_SUCCESS; } -celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){ +void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props) { pubsub_sender_pt manager = (pubsub_sender_pt)handle; - celix_thread_t *tid = hashMap_get(manager->tid_map, service); + celix_thread_t *tid = hashMap_get(manager->tid_map, svc); #if defined(__APPLE__) && defined(__MACH__) uint64_t threadid; @@ -168,5 +165,4 @@ celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt r stop=true; celixThread_join(*tid,NULL); free(tid); - return CELIX_SUCCESS; } http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt index d90d605..94ce5cc 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt +++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt @@ -23,9 +23,11 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast GROUP "Celix/PubSub" SOURCES src/psa_activator.c - src/pubsub_admin_impl.c - src/topic_subscription.c - src/topic_publication.c + src/pubsub_udpmc_admin.c + src/pubsub_udpmc_topic_sender.c + #src/pubsub_admin_impl.c + #src/topic_subscription.c + #src/topic_publication.c src/large_udp.c ) target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c index cd4ee07..406720e 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c @@ -16,126 +16,83 @@ *specific language governing permissions and limitations *under the License. */ -/* - * psa_activator.c - * - * \date Sep 30, 2011 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdlib.h> - -#include "bundle_activator.h" -#include "service_registration.h" -#include "service_tracker.h" - -#include "pubsub_admin_impl.h" -struct activator { - pubsub_admin_pt admin; - pubsub_admin_service_pt adminService; - service_registration_pt registration; - service_tracker_pt serializerTracker; -}; -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; +#include <stdlib.h> - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; - } - else{ - *userData = activator; - - status = pubsubAdmin_create(context, &(activator->admin)); - - if(status == CELIX_SUCCESS){ - service_tracker_customizer_pt customizer = NULL; - status = serviceTrackerCustomizer_create(activator->admin, - NULL, - pubsubAdmin_serializerAdded, - NULL, - pubsubAdmin_serializerRemoved, - &customizer); - if(status == CELIX_SUCCESS){ - status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); - if(status != CELIX_SUCCESS){ - serviceTrackerCustomizer_destroy(customizer); - pubsubAdmin_destroy(activator->admin); - } - } - else{ - pubsubAdmin_destroy(activator->admin); - } - } - } +#include "celix_api.h" +#include "pubsub_serializer.h" +#include "log_helper.h" - return status; -} +#include "pubsub_admin.h" +#include "pubsub_udpmc_admin.h" +#include "../../../shell/shell/include/command.h" -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc)); +typedef struct psa_udpmc_activator { + log_helper_t *logHelper; - if (!pubsubAdminSvc) { - status = CELIX_ENOMEM; - } - else{ - pubsubAdminSvc->admin = activator->admin; + pubsub_udpmc_admin_t *admin; - pubsubAdminSvc->addPublication = pubsubAdmin_addPublication; - pubsubAdminSvc->removePublication = pubsubAdmin_removePublication; + pubsub_admin_service_t adminService; + long adminSvcId; - pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription; - pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription; + command_service_t cmdSvc; + long cmdSvcId; +} psa_udpmc_activator_t; - pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; - pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; +int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) { + act->adminSvcId = -1L; + act->cmdSvcId = -1L; - pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; + logHelper_create(ctx, &act->logHelper); + logHelper_start(act->logHelper); - activator->adminService = pubsubAdminSvc; + act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper); + celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION; - status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); + //register pubsub admin service + if (status == CELIX_SUCCESS) { + pubsub_admin_service_t *psaSvc = &act->adminService; + psaSvc->handle = act->admin; + psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher; + psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber; + psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint; + psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender; + psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender; + psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever; + psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever; + psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint; + psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint; - status += serviceTracker_open(activator->serializerTracker); + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE); + act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props); } + //register shell command service + { + act->cmdSvc.handle = act->admin; + act->cmdSvc.executeCommand = pubsub_udpmcAdmin_executeCommand; + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc"); + celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc"); + celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA"); + celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props); + } return status; } -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - status += serviceTracker_close(activator->serializerTracker); - status += serviceRegistration_unregister(activator->registration); - - activator->registration = NULL; - - free(activator->adminService); - activator->adminService = NULL; - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceTracker_destroy(activator->serializerTracker); - pubsubAdmin_destroy(activator->admin); - activator->admin = NULL; +int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->adminSvcId); + celix_bundleContext_unregisterService(ctx, act->cmdSvcId); + pubsub_udpmcAdmin_destroy(act->admin); - free(activator); + logHelper_stop(act->logHelper); + logHelper_destroy(&act->logHelper); - return status; + return CELIX_SUCCESS; } - +CELIX_GEN_BUNDLE_ACTIVATOR(psa_udpmc_activator_t, psa_udpmc_start, psa_udpmc_stop); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c index 0638efb..2f62f6f 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c @@ -16,13 +16,6 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_admin_impl.c - * - * \date Sep 30, 2011 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #include <stdio.h> #include <stdlib.h> @@ -61,20 +54,20 @@ #include "topic_publication.h" #include "pubsub_endpoint.h" #include "pubsub/subscriber.h" -#include "pubsub_admin_match.h" +#include "pubsub_utils.h" static const char *DEFAULT_MC_IP = "224.100.1.1"; static char *DEFAULT_MC_PREFIX = "224.100"; static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip); -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP); +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP); -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType); -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType); +static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); +static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication); -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin) { celix_status_t status = CELIX_SUCCESS; *admin = calloc(1, sizeof(**admin)); @@ -239,7 +232,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad } -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) +celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin) { celix_status_t status = CELIX_SUCCESS; @@ -325,7 +318,7 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) return status; } -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t *admin,celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&admin->subscriptionsLock); @@ -342,7 +335,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu else{ if (admin->verbose) { printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); } celixThreadMutex_lock(&admin->noSerializerPendingsLock); @@ -362,9 +355,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu if(topic_publishers!=NULL){ for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + celix_properties_t *pubEP = arrayList_get(topic_publishers,i); + if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } } arrayList_destroy(topic_publishers); @@ -380,9 +373,9 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); if(ext_pub_list!=NULL){ for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + celix_properties_t *pubEP = arrayList_get(ext_pub_list,i); + if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } } } @@ -409,10 +402,10 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu return status; } -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_t *admin, celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; - if(strcmp(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){ + if(strcmp(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){ return pubsubAdmin_addAnySubscription(admin,subEP); } @@ -422,7 +415,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint celixThreadMutex_lock(&admin->localPublicationsLock); celixThreadMutex_lock(&admin->externalPublicationsLock); - char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); @@ -437,11 +430,11 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint pubsub_serializer_service_t *best_serializer = NULL; const char *serType = NULL; if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){ - status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription); + status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription); } else { if (admin->verbose) { printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); } celixThreadMutex_lock(&admin->noSerializerPendingsLock); @@ -450,6 +443,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint } if (status==CELIX_SUCCESS){ + //got type and serializer -> update endpoint + celix_properties_set(subEP, PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_UDPMC_PUBSUB_ADMIN_TYPE); + celix_properties_set(subEP, PUBSUB_ENDPOINT_SERIALIZER, serType); /* Try to connect internal publishers */ if(factory!=NULL){ @@ -458,9 +454,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint if(topic_publishers!=NULL){ for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + celix_properties_t *pubEP = arrayList_get(topic_publishers,i); + if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } } arrayList_destroy(topic_publishers); @@ -471,9 +467,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint /* Look also for external publishers */ if(ext_pub_list!=NULL){ for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + celix_properties_t *pubEP = arrayList_get(ext_pub_list,i); + if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } } } @@ -505,38 +501,38 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint if (admin->verbose) { printf("[PSA_UDPMC] Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", - properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), - properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(subEP, PUBSUB_ENDPOINT_UUID), + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", - properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY), - properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE)); - printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY), + properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(subEP, PUBSUB_ENDPOINT_TYPE)); + printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } return status; } -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_t *admin,celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; if (admin->verbose) { printf("[PSA_UDPMC] Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", - properties_get(subEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), - properties_get(subEP->properties, PUBSUB_ENDPOINT_UUID), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(subEP, PUBSUB_ENDPOINT_UUID), + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", - properties_get(subEP->properties, PUBSUB_ADMIN_TYPE_KEY), - properties_get(subEP->properties, PUBSUB_SERIALIZER_TYPE_KEY), - properties_get(subEP->properties, PUBSUB_ENDPOINT_TYPE)); - printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY), + properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(subEP, PUBSUB_ENDPOINT_TYPE)); + printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } - char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); celixThreadMutex_lock(&admin->subscriptionsLock); topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); @@ -565,7 +561,7 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo } -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ +celix_status_t pubsubAdmin_addPublication(pubsub_admin_t *admin,celix_properties_t *pubEP){ celix_status_t status = CELIX_SUCCESS; const char* fwUUID = NULL; @@ -575,22 +571,22 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ return CELIX_INVALID_BUNDLE_CONTEXT; } - const char *epFwUUID = properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID); + const char *epFwUUID = properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID); bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0; if (isOwn) { //should be null, willl be set in this call - assert(properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY) == NULL); - assert(properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY) == NULL); + assert(properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY) == NULL); + assert(properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY) == NULL); } if (isOwn) { - properties_set(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE); + properties_set(pubEP, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE); } - char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); - if ((strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) { + if ((strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == NULL)) { celixThreadMutex_lock(&admin->localPublicationsLock); @@ -603,11 +599,11 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){ status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub); if (isOwn) { - properties_set(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY, serType); + properties_set(pubEP, PUBSUB_SERIALIZER_TYPE_KEY, serType); } } else { printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); celixThreadMutex_lock(&admin->noSerializerPendingsLock); arrayList_add(admin->noSerializerPublications,pubEP); @@ -622,8 +618,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ } } else { printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s.\n", - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); } } else { //just add the new EP to the list @@ -655,7 +651,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); int i; for (i = 0; i < arrayList_size(pendingSubList); i++) { - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); + celix_properties_t *subEP = arrayList_get(pendingSubList, i); pubsubAdmin_addSubscription(admin, subEP); } hashMap_remove(admin->pendingSubscriptions, scope_topic); @@ -670,14 +666,14 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ celixThreadMutex_lock(&admin->subscriptionsLock); topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + if (sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + if (any_sub != NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } free(scope_topic); @@ -686,16 +682,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ if (admin->verbose) { printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", - properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP, PUBSUB_ENDPOINT_UUID), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", - properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY), - properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE)); + properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY), + properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(pubEP, PUBSUB_ENDPOINT_TYPE)); printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n", - properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY), + properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY), isOwn); } @@ -703,21 +699,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ } -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ +celix_status_t pubsubAdmin_removePublication(pubsub_admin_t *admin,celix_properties_t *pubEP){ celix_status_t status = CELIX_SUCCESS; int count = 0; if (admin->verbose) { - printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", - properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_UUID), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + printf("PSA_UDPMC: Remove publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", + properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP, PUBSUB_ENDPOINT_UUID), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", - properties_get(pubEP->properties, PUBSUB_ADMIN_TYPE_KEY), - properties_get(pubEP->properties, PUBSUB_SERIALIZER_TYPE_KEY), - properties_get(pubEP->properties, PUBSUB_ENDPOINT_TYPE)); - printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY), + properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(pubEP, PUBSUB_ENDPOINT_TYPE)); + printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } const char* fwUUID = NULL; @@ -727,9 +723,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } - char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME)); - if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){ + if(strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){ celixThreadMutex_lock(&admin->localPublicationsLock); service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); @@ -757,7 +753,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi int i; bool found = false; for(i=0;!found && i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + celix_properties_t *p = arrayList_get(ext_pub_list,i); found = pubsubEndpoint_equals(pubEP,p); if (found){ arrayList_remove(ext_pub_list,i); @@ -765,8 +761,8 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi } // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) for(i=0; i<arrayList_size(ext_pub_list);i++) { - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if (strcmp(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) { + celix_properties_t *p = arrayList_get(ext_pub_list,i); + if (strcmp(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) { count++; } } @@ -788,14 +784,14 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celixThreadMutex_lock(&admin->subscriptionsLock); topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + if(sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); + if(any_sub!=NULL && properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)); } free(scope_topic); @@ -805,7 +801,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi } -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){ +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_t *admin,char *scope, char* topic){ celix_status_t status = CELIX_SUCCESS; printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic); @@ -831,7 +827,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop } -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){ +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_t *admin,char *scope, char* topic){ celix_status_t status = CELIX_SUCCESS; printf("PSA_UDP_MC: Closing all subscriptions\n"); @@ -891,10 +887,10 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) } #endif -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t *admin,celix_properties_t *subEP){ celix_status_t status = CELIX_SUCCESS; - char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME)); array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); if(pendingListPerTopic==NULL){ arrayList_create(&pendingListPerTopic); @@ -921,7 +917,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r return CELIX_SERVICE_EXCEPTION; } - pubsub_admin_pt admin = (pubsub_admin_pt)handle; + pubsub_admin_t *admin = (pubsub_admin_t*)handle; celixThreadMutex_lock(&admin->serializerListLock); arrayList_add(admin->serializerList, reference); celixThreadMutex_unlock(&admin->serializerListLock); @@ -930,7 +926,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r celixThreadMutex_lock(&admin->noSerializerPendingsLock); for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); + celix_properties_t *ep = arrayList_get(admin->noSerializerSubscriptions,i); pubsub_serializer_service_t *best_serializer = NULL; pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL); if(best_serializer != NULL){ /* Finally we have a valid serializer! */ @@ -939,7 +935,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r } for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); + celix_properties_t *ep = arrayList_get(admin->noSerializerPublications,i); pubsub_serializer_service_t *best_serializer = NULL; pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL); if(best_serializer != NULL){ /* Finally we have a valid serializer! */ @@ -958,7 +954,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ - pubsub_admin_pt admin = (pubsub_admin_pt)handle; + pubsub_admin_t *admin = (pubsub_admin_t*)handle; int i=0, j=0; const char *serType = NULL; @@ -987,12 +983,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt /* Get the endpoints that are going to be orphan */ array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub); for(j=0;j<arrayList_size(pubList);j++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j); + celix_properties_t *pubEP = arrayList_get(pubList,j); /* Remove the publication */ pubsubAdmin_removePublication(admin, pubEP); /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(properties_get(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){ - properties_unset(pubEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY); + if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){ + properties_unset(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY); } /* Add the orphan endpoint to the noSerializer pending list */ celixThreadMutex_lock(&admin->noSerializerPendingsLock); @@ -1038,12 +1034,12 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt /* Get the endpoints that are going to be orphan */ array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub); for(j=0;j<arrayList_size(subList);j++){ - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j); + celix_properties_t *subEP = arrayList_get(subList,j); /* Remove the subscription */ pubsubAdmin_removeSubscription(admin, subEP); /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(properties_get(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){ - properties_unset(subEP->properties, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY); + if(properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){ + properties_unset(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY); } /* Add the orphan endpoint to the noSerializer pending list */ celixThreadMutex_lock(&admin->noSerializerPendingsLock); @@ -1084,24 +1080,10 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt return CELIX_SUCCESS; } -celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ - celix_status_t status = CELIX_SUCCESS; - - const char *fwUuid = NULL; - bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid); - if (fwUuid == NULL) { - return CELIX_ILLEGAL_STATE; - } - - celixThreadMutex_lock(&admin->serializerListLock); - status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score); - celixThreadMutex_unlock(&admin->serializerListLock); - - return status; -} /* This one recall the same logic as in the match function */ -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){ +/* +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, celix_properties_t *ep, pubsub_serializer_service_t **out, const char **serType){ celix_status_t status = CELIX_SUCCESS; pubsub_serializer_service_t *serSvc = NULL; @@ -1122,9 +1104,9 @@ static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsu *out = serSvc; return status; -} +}*/ -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ +static void connectTopicPubSubToSerializer(pubsub_admin_t *admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ celixThreadMutex_lock(&admin->usedSerializersLock); @@ -1140,7 +1122,7 @@ static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializ } -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ +static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void *topicPubSub,bool isPublication){ celixThreadMutex_lock(&admin->usedSerializersLock); @@ -1157,3 +1139,83 @@ static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topi celixThreadMutex_unlock(&admin->usedSerializersLock); } + +celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&admin->serializerListLock); + long serializerSvcId = -1L; + double score = pubsub_utils_matchPublisher(admin->bundle_context, svcRequesterBndId, svcFilter->filterStr, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId); + celixThreadMutex_unlock(&admin->serializerListLock); + if (outScore != NULL) { + *outScore = score; + } + if (outSerializerSvcId != NULL) { + *outSerializerSvcId = serializerSvcId; + } + return status; +} + +celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&admin->serializerListLock); + long serializerSvcId = -1L; + double score = pubsub_utils_matchSubscriber(admin->bundle_context, svcProviderBndId, svcProperties, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId); + celixThreadMutex_unlock(&admin->serializerListLock); + if (outScore != NULL) { + *outScore = score; + } + if (outSerializerSvcId != NULL) { + *outSerializerSvcId = serializerSvcId; + } + return status; +} + +celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&admin->serializerListLock); + long serializerSvcId = -1L; + double score = pubsub_utils_matchEndpoint(admin->bundle_context, endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, &serializerSvcId); + celixThreadMutex_unlock(&admin->serializerListLock); + if (outScore != NULL) { + *outScore = score; + } + return status; +} + +celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} +celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_admin_t *admin = handle; + celix_status_t status = CELIX_SUCCESS; + return status; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h index b82e8a1..7618a6e 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h @@ -32,9 +32,10 @@ #include "log_helper.h" +#define PUBSUB_PSA_UDPMC_PSA_TYPE "udp_mc" #define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address" -struct pubsub_admin { +typedef struct pubsub_admin { bundle_context_pt bundle_context; log_helper_pt loghelper; @@ -77,24 +78,34 @@ struct pubsub_admin { double defaultScore; bool verbose; -}; +} pubsub_admin_t; -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t **admin); +celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin); -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); +void pubsubAdmin_addSerializer(void * handle, void *svc, const celix_properties_t *properties); +void pubsubAdmin_removeSerializer(void * handle, void *svc, const celix_properties_t *properties); -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic); -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic); -celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); +//for the pubsub_admin_service + +celix_status_t pubsubAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); +celix_status_t pubsubAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); +celix_status_t pubsubAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score); + +//note endpoint is owned by caller +celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); +celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic); + +//note endpoint is owned by caller +celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); +celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic); + +celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint); +celix_status_t pubsubAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint); + #endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c new file mode 100644 index 0000000..650e439 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@ -0,0 +1,335 @@ + +#include <memory.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <pubsub_endpoint.h> + +#include "pubsub_utils.h" +#include "pubsub_udpmc_admin.h" +#include "pubsub_psa_udpmc_constants.h" +#include "pubsub_udpmc_topic_sender.h" + +#define PUBSUB_UDPMC_MC_IP_DEFAULT "224.100.1.1" + +#define LOG_DEBUG(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define LOG_INFO(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__); +#define LOG_WARN(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__); +#define LOG_ERROR(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) + + +static celix_status_t udpmc_getIpAddress(const char* interface, char** ip); + +pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) { + pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa)); + psa->ctx = ctx; + psa->log = logHelper; + psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_UDPMC_VERBOSE_KEY, PUBSUB_UDPMC_VERBOSE_DEFAULT); + psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); + + int b0 = 0, b1 = 0, b2 = 0, b3 = 0; + + char *mc_ip = NULL; + char *if_ip = NULL; + int sendSocket = -1; + + const char *mcIpProp = celix_bundleContext_getProperty(ctx,PUBSUB_UDPMC_IP_KEY , NULL); + if(mcIpProp != NULL) { + mc_ip = strdup(mcIpProp); + } + + + const char *mc_prefix = celix_bundleContext_getProperty(ctx, + PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY, + PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT); + const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_ITF_KEY, NULL); + if (udpmc_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) { + LOG_WARN("[PSA_UDPMC] Could not retrieve IP address for interface %s", interface); + } else if (psa->verbose) { + LOG_INFO("[PSA_UDPMC] Using IP address %s", if_ip); + } + + if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) { + logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, "[PSA_UDPMC] Could not parse IP address %s", if_ip); + b2 = 1; + b3 = 1; + } + + asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3); + + sendSocket = socket(AF_INET, SOCK_DGRAM, 0); + if(sendSocket == -1) { + LOG_ERROR("[PSA_UDPMC] Error creating socket: %s", strerror(errno)); + } else { + char loop = 1; + int rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)); + if(rc != 0) { + LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_LOOP): %s", strerror(errno)); + } + if (rc == 0) { + struct in_addr multicast_interface; + inet_aton(if_ip, &multicast_interface); + rc = setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)); + if (rc != 0) { + LOG_ERROR("[PSA_UDPMC] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno)); + } + } + if (rc == 0) { + psa->sendSocket = sendSocket; + } + } + + if (if_ip != NULL) { + psa->ifIpAddress = if_ip; + } else { + psa->ifIpAddress = strdup("127.0.0.1"); + + } + if (psa->verbose) { + LOG_INFO("[PSA_UDPMC] Using %s as interface for multicast communication", psa->ifIpAddress); + } + + + if (mc_ip != NULL) { + psa->mcIpAddress = mc_ip; + } else { + psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT); + } + if (psa->verbose) { + LOG_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress); + } + + psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_DEFAULT_SCORE_KEY, PSA_UDPMC_DEFAULT_SCORE); + psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE); + psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE); + + celixThreadMutex_create(&psa->topicSenders.mutex, NULL); + psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + celixThreadMutex_create(&psa->topicReceivers.mutex, NULL); + psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL); + psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + return psa; +} + +void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) { + if (psa == NULL) { + return; + } + + //note assuming al psa register services and service tracker are removed. + + celixThreadMutex_destroy(&psa->topicSenders.mutex); + hashMap_destroy(psa->topicSenders.map, true, false); + + celixThreadMutex_destroy(&psa->topicReceivers.mutex); + hashMap_destroy(psa->topicReceivers.map, true, false); + + celixThreadMutex_destroy(&psa->connectedEndpoints.mutex); + hashMap_destroy(psa->connectedEndpoints.map, true, false); + + free(psa->mcIpAddress); + free(psa->ifIpAddress); + free(psa); +} + +celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) { + pubsub_udpmc_admin_t *psa = handle; + LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE, + psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId); + *outScore = score; + + return status; +} + +celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) { + pubsub_udpmc_admin_t *psa = handle; + LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE, + psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId); + if (outScore != NULL) { + *outScore = score; + } + return status; +} + +celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) { + pubsub_udpmc_admin_t *psa = handle; + LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, + psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL); + if (outScore != NULL) { + *outScore = score; + } + return status; +} + +celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) { + pubsub_udpmc_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + //1) Create TopicSender + //2) Store TopicSender + //3) Connect existing endpoints + //4) set outPublisherEndpoint + + celix_properties_t *newEndpoint = NULL; + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicSenders.mutex); + pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key); + if (sender == NULL) { + sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId); + const char *psaType = pubsub_udpmcTopicSender_psaType(sender); + const char *serType = pubsub_udpmcTopicSender_serializerType(sender); + newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, + PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL); + bool valid = pubsubEndpoint_isValid(newEndpoint, true, true); + if (!valid) { + LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid"); + celix_properties_destroy(newEndpoint); + pubsub_udpmcTopicSender_destroy(sender); + free(key); + } else { + hashMap_put(psa->topicSenders.map, key, sender); + //TODO connect endpoints to sender + } + } else { + free(key); + LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + + if (newEndpoint != NULL && outPublisherEndpoint != NULL) { + *outPublisherEndpoint = newEndpoint; + } + + return status; +} + +celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) { + pubsub_udpmc_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + //1) Find and remove TopicSender from map + //2) destroy topic sender + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key); + if (entry != NULL) { + char *mapKey = hashMapEntry_getKey(entry); + pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key); + free(mapKey); + //TODO disconnect endpoints to sender + pubsub_udpmcTopicSender_destroy(sender); + } else { + LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + free(key); + + return status; +} + +celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) { + pubsub_udpmc_admin_t *psa = handle; + LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic); + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) { + pubsub_udpmc_admin_t *psa = handle; + LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic); + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_udpmc_admin_t *psa = handle; + LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint"); + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_udpmc_admin_t *psa = handle; + LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint"); + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) { + pubsub_udpmc_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + fprintf(out, "\nTopic Senders:\n"); + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter); + const char *psaType = pubsub_udpmcTopicSender_psaType(sender); + const char *serType = pubsub_udpmcTopicSender_serializerType(sender); + const char *scope = pubsub_udpmcTopicSender_scope(sender); + const char *topic = pubsub_udpmcTopicSender_topic(sender); + const char *url = pubsub_udpmcTopicSender_socketAddress(sender); + fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); + fprintf(out, " |- psa type = %s\n", psaType); + fprintf(out, " |- serializer type = %s\n", serType); + fprintf(out, " |- url = %s\n", url); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + fprintf(out, "\n"); + + //TODO topic receivers + + return status; +} + +#ifndef ANDROID +static celix_status_t udpmc_getIpAddress(const char* interface, char** ip) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + struct ifaddrs *ifaddr, *ifa; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) != -1) + { + for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL) + continue; + + if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { + if (interface == NULL) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + else if (strcmp(ifa->ifa_name, interface) == 0) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + } + } + + freeifaddrs(ifaddr); + } + + return status; +} +#endif \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h new file mode 100644 index 0000000..011d272 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h @@ -0,0 +1,92 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef CELIX_PUBSUB_UDPMC_ADMIN_H +#define CELIX_PUBSUB_UDPMC_ADMIN_H + +#include "celix_api.h" +#include "log_helper.h" + +#define PUBSUB_UDPMC_ADMIN_TYPE "udpmc" +#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address" + +#define PUBSUB_UDPMC_IP_KEY "PSA_IP" +#define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE" +#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY "PSA_MC_PREFIX" +#define PUBSUB_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE" + +#define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100" +#define PUBSUB_UDPMC_VERBOSE_DEFAULT "true" + + +typedef struct pubsub_udpmc_admin { + celix_bundle_context_t *ctx; + log_helper_t *log; + char* ifIpAddress; // The local interface which is used for multicast communication + char* mcIpAddress; // The multicast IP address + int sendSocket; + double qosSampleScore; + double qosControlScore; + double defaultScore; + bool verbose; + const char *fwUUID; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t + } topicSenders; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t + } topicReceivers; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t + } connectedEndpoints; + +} pubsub_udpmc_admin_t; + +typedef struct psa_udpmc_connected_endpoint { + void *sender; //if connected endpoint is subscriber. todo type + void *receiver; //if connected endpoint is publisher. TODO type + char *endpointUUID; +} psa_udpmc_connected_endpoint_t; + +pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper); +void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa); + +celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); +celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); +celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score); + +celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); +celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic); + +celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); +celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic); + +celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint); +celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint); + +celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream); + +#endif //CELIX_PUBSUB_UDPMC_ADMIN_H + http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c new file mode 100644 index 0000000..5553d3b --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c @@ -0,0 +1,99 @@ +#include <pubsub_serializer.h> +#include <stdlib.h> +#include <memory.h> +#include <pubsub_constants.h> +#include "pubsub_udpmc_topic_sender.h" +#include "pubsub_psa_udpmc_constants.h" + +static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props); + +struct pubsub_updmc_topic_sender { + celix_bundle_context_t *ctx; + char *scope; + char *topic; + char *socketAddress; + + long serTrackerId; + struct { + celix_thread_mutex_t mutex; + pubsub_serializer_service_t *svc; + const celix_properties_t *props; + } serializer; +}; + +pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) { + pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender)); + sender->ctx = ctx; + sender->scope = strndup(scope, 1024 * 1024); + sender->topic = strndup(topic, 1024 * 1024); + + celixThreadMutex_create(&sender->serializer.mutex, NULL); + + char filter[64]; + snprintf(filter, 64, "(service.id=%li)", serializerSvcId); + + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.filter = filter; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = sender; + opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer; + sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + + return sender; +} + +void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) { + if (sender != NULL) { + celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId); + + celixThreadMutex_destroy(&sender->serializer.mutex); + + free(sender->scope); + free(sender->topic); + free(sender); + } +} + +const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender __attribute__((unused))) { + return PSA_UDPMC_PUBSUB_ADMIN_TYPE; +} + +const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t *sender) { + const char *result = NULL; + celixThreadMutex_lock(&sender->serializer.mutex); + if (sender->serializer.props != NULL) { + result = celix_properties_get(sender->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL); + } + celixThreadMutex_unlock(&sender->serializer.mutex); + return result; +} + +const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender) { + return sender->scope; +} + +const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender) { + return sender->topic; +} + +const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender) { + return sender->socketAddress; +} + +void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) { + //TODO +} + +void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) { + //TODO +} + +static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) { + pubsub_updmc_topic_sender_t *sender = handle; + pubsub_serializer_service_t *ser = svc; + celixThreadMutex_lock(&sender->serializer.mutex); + sender->serializer.svc = ser; + sender->serializer.props = props; + celixThreadMutex_unlock(&sender->serializer.mutex); +} \ No newline at end of file