CELIX-454: More PubSub. The UDPMC is now somewhat working again, but still needs some testing.
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2eb219e3 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2eb219e3 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2eb219e3 Branch: refs/heads/feature/CELIX-454-pubsub-disc Commit: 2eb219e393cfafd47a99739da3e56e0ab882799a Parents: 69596cf Author: Pepijn Noltes <pepijnnol...@gmail.com> Authored: Wed Sep 26 21:13:50 2018 +0200 Committer: Pepijn Noltes <pepijnnol...@gmail.com> Committed: Wed Sep 26 21:13:50 2018 +0200 ---------------------------------------------------------------------- .../pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 2 + .../pubsub_admin_udp_mc/src/psa_activator.c | 6 +- .../src/pubsub_udpmc_admin.c | 295 +++++++++- .../src/pubsub_udpmc_admin.h | 47 +- .../src/pubsub_udpmc_common.c | 40 ++ .../src/pubsub_udpmc_common.h | 39 ++ .../src/pubsub_udpmc_topic_receiver.c | 450 +++++++++++++++ .../src/pubsub_udpmc_topic_receiver.h | 45 ++ .../src/pubsub_udpmc_topic_sender.c | 305 +++++++++- .../src/pubsub_udpmc_topic_sender.h | 10 +- .../pubsub_admin_udp_mc/src/topic_publication.c | 2 +- .../pubsub_admin_zmq/src/topic_subscription.c | 2 +- .../pubsub/pubsub_discovery/src/psd_activator.c | 19 +- .../src/pubsub_discovery_impl.c | 32 +- .../src/pubsub_discovery_impl.h | 8 +- .../pubsub/pubsub_spi/include/pubsub_admin.h | 2 +- .../pubsub/pubsub_spi/include/pubsub_common.h | 2 +- .../pubsub/pubsub_spi/include/pubsub_utils.h | 5 +- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 9 +- .../pubsub/pubsub_spi/src/pubsub_utils_match.c | 33 +- .../src/pubsub_topology_manager.c | 565 +++++++++++-------- .../src/pubsub_topology_manager.h | 15 +- libs/framework/include/celix_bundle_context.h | 2 +- 23 files changed, 1576 insertions(+), 359 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt index 94ce5cc..3f74376 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt +++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt @@ -25,6 +25,8 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast src/psa_activator.c src/pubsub_udpmc_admin.c src/pubsub_udpmc_topic_sender.c + src/pubsub_udpmc_topic_receiver.c + src/pubsub_udpmc_common.c #src/pubsub_admin_impl.c #src/topic_subscription.c #src/topic_publication.c http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c index 406720e..682efc5 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c @@ -59,8 +59,8 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) { psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint; psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender; psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender; - psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReciever; - psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReciever; + psaSvc->setupTopicReciever = pubsub_udpmcAdmin_setupTopicReceiver; + psaSvc->teardownTopicReciever = pubsub_udpmcAdmin_teardownTopicReceiver; psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint; psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint; @@ -78,7 +78,7 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) { celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc"); celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc"); celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA"); - celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props); + act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props); } return status; http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c index 650e439..3e06334 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@ -1,3 +1,21 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ #include <memory.h> #include <sys/socket.h> @@ -11,8 +29,11 @@ #include "pubsub_udpmc_admin.h" #include "pubsub_psa_udpmc_constants.h" #include "pubsub_udpmc_topic_sender.h" +#include "pubsub_udpmc_topic_receiver.h" #define PUBSUB_UDPMC_MC_IP_DEFAULT "224.100.1.1" +#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address" +#define PUBSUB_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port" #define LOG_DEBUG(...) \ logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) @@ -23,8 +44,46 @@ #define LOG_ERROR(...) \ logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) +struct pubsub_udpmc_admin { + celix_bundle_context_t *ctx; + log_helper_t *log; + char *ifIpAddress; // The local interface which is used for multicast communication + char *mcIpAddress; // The multicast IP address + int sendSocket; + double qosSampleScore; + double qosControlScore; + double defaultScore; + bool verbose; + const char *fwUUID; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t + } topicSenders; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t + } topicReceivers; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t + } discoveredEndpoints; + +}; + +typedef struct psa_udpmc_connected_endpoint { + void *sender; //if connected endpoint is subscriber. todo type + void *receiver; //if connected endpoint is publisher. TODO type + char *endpointUUID; +} psa_udpmc_connected_endpoint_t; + static celix_status_t udpmc_getIpAddress(const char* interface, char** ip); +static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint); +static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint); + pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) { pubsub_udpmc_admin_t *psa = calloc(1, sizeof(*psa)); @@ -115,8 +174,8 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_ celixThreadMutex_create(&psa->topicReceivers.mutex, NULL); psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - celixThreadMutex_create(&psa->connectedEndpoints.mutex, NULL); - psa->connectedEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL); + psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); return psa; } @@ -128,14 +187,38 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) { //note assuming al psa register services and service tracker are removed. + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_sender_t *sender = hashMapIterator_nextValue(&iter); + pubsub_udpmcTopicSender_destroy(sender); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_receiver_t *recv = hashMapIterator_nextValue(&iter); + pubsub_udpmcTopicReceiver_destroy(recv); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + iter = hashMapIterator_construct(psa->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_properties_t *ep = hashMapIterator_nextValue(&iter); + celix_properties_destroy(ep); + } + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + celixThreadMutex_destroy(&psa->topicSenders.mutex); hashMap_destroy(psa->topicSenders.map, true, false); celixThreadMutex_destroy(&psa->topicReceivers.mutex); hashMap_destroy(psa->topicReceivers.map, true, false); - celixThreadMutex_destroy(&psa->connectedEndpoints.mutex); - hashMap_destroy(psa->connectedEndpoints.map, true, false); + celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex); + hashMap_destroy(psa->discoveredEndpoints.map, true, false); free(psa->mcIpAddress); free(psa->ifIpAddress); @@ -165,14 +248,13 @@ celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderB return status; } -celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *outScore) { +celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) { pubsub_udpmc_admin_t *psa = handle; LOG_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint"); celix_status_t status = CELIX_SUCCESS; - double score = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, - psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, NULL); - if (outScore != NULL) { - *outScore = score; + bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL); + if (outMatch != NULL) { + *outMatch = match; } return status; } @@ -192,11 +274,18 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop celixThreadMutex_lock(&psa->topicSenders.mutex); pubsub_updmc_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key); if (sender == NULL) { - sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId); + sender = pubsub_udpmcTopicSender_create(psa->ctx, scope, topic, serializerSvcId, psa->sendSocket, psa->mcIpAddress); const char *psaType = pubsub_udpmcTopicSender_psaType(sender); const char *serType = pubsub_udpmcTopicSender_serializerType(sender); newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL); + celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender)); + celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender)); + //if available also set container name + const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); + if (cn != NULL) { + celix_properties_set(newEndpoint, "container_name", cn); + } bool valid = pubsubEndpoint_isValid(newEndpoint, true, true); if (!valid) { LOG_ERROR("[PSA UDPMC] Error creating a valid TopicSender. Endpoints are not valid"); @@ -205,7 +294,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop free(key); } else { hashMap_put(psa->topicSenders.map, key, sender); - //TODO connect endpoints to sender + //TODO connect endpoints to sender, NOTE is this needed for a udpmc topic sender? } } else { free(key); @@ -235,7 +324,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s char *mapKey = hashMapEntry_getKey(entry); pubsub_updmc_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key); free(mapKey); - //TODO disconnect endpoints to sender + //TODO disconnect endpoints to sender. note is this needed for a udpmc topic sender? pubsub_udpmcTopicSender_destroy(sender); } else { LOG_ERROR("[PSA UDPMC] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); @@ -246,30 +335,175 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s return status; } -celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) { +celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) { pubsub_udpmc_admin_t *psa = handle; - LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_setupTopicReciever. scope/topic: %s/%s", scope, topic); + + celix_properties_t *newEndpoint = NULL; + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + pubsub_updmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); + if (receiver == NULL) { + receiver = pubsub_udpmcTopicReceiver_create(psa->ctx, scope, topic, psa->ifIpAddress, serializerSvcId); + const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver); + const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver); + newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, + PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL); + + //if available also set container name + const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); + if (cn != NULL) { + celix_properties_set(newEndpoint, "container_name", cn); + } + bool valid = pubsubEndpoint_isValid(newEndpoint, true, true); + if (!valid) { + LOG_ERROR("[PSA UDPMC] Error creating a valid TopicReceiver. Endpoints are not valid"); + celix_properties_destroy(newEndpoint); + pubsub_udpmcTopicReceiver_destroy(receiver); + free(key); + } else { + hashMap_put(psa->topicReceivers.map, key, receiver); + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); + pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + } + } else { + free(key); + LOG_ERROR("[PSA_UDPMC] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + + if (newEndpoint != NULL && outSubscriberEndpoint != NULL) { + *outSubscriberEndpoint = newEndpoint; + } + celix_status_t status = CELIX_SUCCESS; return status; } -celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic) { +celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) { pubsub_udpmc_admin_t *psa = handle; - LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_teardownTopicReciever. scope/topic: %s/%s", scope, topic); + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key); + free(key); + if (entry != NULL) { + char *receiverKey = hashMapEntry_getKey(entry); + pubsub_updmc_topic_receiver_t *receiver = hashMapEntry_getValue(entry); + hashMap_remove(psa->topicReceivers.map, receiverKey); + + free(receiverKey); + pubsub_udpmcTopicReceiver_destroy(receiver); + } + celixThreadMutex_lock(&psa->topicReceivers.mutex); + celix_status_t status = CELIX_SUCCESS; return status; } +static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + const char *scope = pubsub_udpmcTopicReceiver_scope(receiver); + const char *topic = pubsub_udpmcTopicReceiver_topic(receiver); + + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL); + long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L); + + if (type == NULL || sockAdress == NULL || sockPort < 0) { + fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type"); + status = CELIX_BUNDLE_EXCEPTION; + } else { + if (eScope != NULL && eTopic != NULL && type != NULL && + strncmp(eScope, scope, 1024 * 1024) == 0 && + strncmp(eTopic, topic, 1024 * 1024) == 0 && + strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + pubsub_udpmcTopicReceiver_connectTo(receiver, sockAdress, sockPort); + } + } + + return status; +} + celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) { pubsub_udpmc_admin_t *psa = handle; - LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_addEndpoint"); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + celix_properties_t *cpy = celix_properties_copy(endpoint); + const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL); + hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy); + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + celix_status_t status = CELIX_SUCCESS; return status; } + +static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_udpmc_admin_t* psa, pubsub_updmc_topic_receiver_t *receiver, const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + const char *scope = pubsub_udpmcTopicReceiver_scope(receiver); + const char *topic = pubsub_udpmcTopicReceiver_topic(receiver); + + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL); + long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L); + + if (type == NULL || sockAdress == NULL || sockPort < 0) { + fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type"); + status = CELIX_BUNDLE_EXCEPTION; + } else { + if (eScope != NULL && eTopic != NULL && type != NULL && + strncmp(eScope, scope, 1024 * 1024) == 0 && + strncmp(eTopic, topic, 1024 * 1024) == 0 && + strncmp(type, PUBSUB_PUBLISHER_ENDPOINT_TYPE, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + pubsub_udpmcTopicReceiver_disconnectFrom(receiver, sockAdress, sockPort); + } + } + + return status; +} + celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) { pubsub_udpmc_admin_t *psa = handle; - LOG_INFO("[PSA_UDPMC] pubsub_udpmcAdmin_removeEndpoint"); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + pubsub_udpmcAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); + celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid); + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + + if (found != NULL) { + celix_properties_destroy(found); + } + celix_status_t status = CELIX_SUCCESS; return status; } @@ -278,7 +512,8 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine pubsub_udpmc_admin_t *psa = handle; celix_status_t status = CELIX_SUCCESS; - fprintf(out, "\nTopic Senders:\n"); + fprintf(out, "\n"); + fprintf(out, "Topic Senders:\n"); celixThreadMutex_lock(&psa->topicSenders.mutex); hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); while (hashMapIterator_hasNext(&iter)) { @@ -287,16 +522,32 @@ celix_status_t pubsub_udpmcAdmin_executeCommand(void *handle, char *commandLine const char *serType = pubsub_udpmcTopicSender_serializerType(sender); const char *scope = pubsub_udpmcTopicSender_scope(sender); const char *topic = pubsub_udpmcTopicSender_topic(sender); - const char *url = pubsub_udpmcTopicSender_socketAddress(sender); + const char *sockAddr = pubsub_udpmcTopicSender_socketAddress(sender); + fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); + fprintf(out, " |- psa type = %s\n", psaType); + fprintf(out, " |- serializer type = %s\n", serType); + fprintf(out, " |- socket address = %s\n", sockAddr); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + fprintf(out, "\n"); + fprintf(out, "\nTopic Receivers:\n"); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_updmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + const char *psaType = pubsub_udpmcTopicReceiver_psaType(receiver); + const char *serType = pubsub_udpmcTopicReceiver_serializerType(receiver); + const char *scope = pubsub_udpmcTopicReceiver_scope(receiver); + const char *topic = pubsub_udpmcTopicReceiver_topic(receiver); fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); fprintf(out, " |- psa type = %s\n", psaType); fprintf(out, " |- serializer type = %s\n", serType); - fprintf(out, " |- url = %s\n", url); } celixThreadMutex_unlock(&psa->topicSenders.mutex); fprintf(out, "\n"); - //TODO topic receivers + //TODO topic receivers/senders connection count return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h index 011d272..11c7e13 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h @@ -23,8 +23,9 @@ #include "celix_api.h" #include "log_helper.h" -#define PUBSUB_UDPMC_ADMIN_TYPE "udpmc" -#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "pubsub.udpmc.socket_address" +#define PUBSUB_UDPMC_ADMIN_TYPE "udp_mc" +#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address" +#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY "udpmc.socker_port" #define PUBSUB_UDPMC_IP_KEY "PSA_IP" #define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE" @@ -34,54 +35,20 @@ #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100" #define PUBSUB_UDPMC_VERBOSE_DEFAULT "true" - -typedef struct pubsub_udpmc_admin { - celix_bundle_context_t *ctx; - log_helper_t *log; - char* ifIpAddress; // The local interface which is used for multicast communication - char* mcIpAddress; // The multicast IP address - int sendSocket; - double qosSampleScore; - double qosControlScore; - double defaultScore; - bool verbose; - const char *fwUUID; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t - } topicSenders; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = scope:topic key, value = pubsub_udpmc_topic_sender_t - } topicReceivers; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = endpoint uuid, value = psa_udpmc_connected_endpoint_entry_t - } connectedEndpoints; - -} pubsub_udpmc_admin_t; - -typedef struct psa_udpmc_connected_endpoint { - void *sender; //if connected endpoint is subscriber. todo type - void *receiver; //if connected endpoint is publisher. TODO type - char *endpointUUID; -} psa_udpmc_connected_endpoint_t; +typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t; pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper); void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa); celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); -celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, double *score); +celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match); celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic); -celix_status_t pubsub_udpmcAdmin_setupTopicReciever(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); -celix_status_t pubsub_udpmcAdmin_teardownTopicReciever(void *handle, const char *scope, const char *topic); +celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); +celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic); celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint); celix_status_t pubsub_udpmcAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c new file mode 100644 index 0000000..a039e6c --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c @@ -0,0 +1,40 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include "pubsub_udpmc_common.h" + +int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) { + *msgTypeId = utils_stringHash(msgType); + return 0; +} + +bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr) { + bool check=false; + int major=0,minor=0; + + if(msgVersion!=NULL){ + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ + check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + } + } + + return check; +} http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h new file mode 100644 index 0000000..e49aaa7 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef CELIX_PUBSUB_UDPMC_COMMON_H +#define CELIX_PUBSUB_UDPMC_COMMON_H + +#include <utils.h> + +#include "version.h" +#include "pubsub_common.h" + +typedef struct pubsub_udp_msg { + struct pubsub_msg_header header; + unsigned int payloadSize; + char payload[]; +} pubsub_udp_msg_t; + +int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId); + +bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_msg_header_t *hdr); + + +#endif //CELIX_PUBSUB_UDPMC_COMMON_H http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c new file mode 100644 index 0000000..f3e320a --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c @@ -0,0 +1,450 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <pubsub_serializer.h> +#include <stdlib.h> +#include <pubsub/subscriber.h> +#include <memory.h> +#include <pubsub_constants.h> +#include <sys/epoll.h> +#include <assert.h> +#include <pubsub_endpoint.h> +#include <arpa/inet.h> +#include "pubsub_udpmc_topic_receiver.h" +#include "pubsub_psa_udpmc_constants.h" +#include "large_udp.h" +#include "pubsub_udpmc_common.h" + +#define MAX_EPOLL_EVENTS 10 +#define RECV_THREAD_TIMEOUT 5 +#define UDP_BUFFER_SIZE 65535 +#define MAX_UDP_SESSIONS 16 + +struct pubsub_updmc_topic_receiver { + celix_bundle_context_t *ctx; + char *scope; + char *topic; + char* ifIpAddress; + largeUdp_pt largeUdpHandle; + int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. + + //serialiser svc + long serializerTrackerId; + struct { + + celix_thread_mutex_t mutex; //protect svc + pubsub_serializer_service_t *svc; + const celix_properties_t *props; + } serializer; + + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; + bool running; + } recvThread; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = socketAddress, value = psa_udpmc_requested_connection_entry_t* + } requestedConnections; + + long subscriberTrackerId; + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bnd id, value = psa_udpmc_subscriber_entry_t + } subscribers; +}; + +typedef struct psa_udpmc_requested_connection_entry { + char *key; + char *socketAddress; + long socketPort; + bool connected; + int recvSocket; +} psa_udpmc_requested_connection_entry_t; + +typedef struct psa_udpmc_subscriber_entry { + int usageCount; + hash_map_t *msgTypes; //map from serializer svc + pubsub_subscriber_t *svc; +} psa_udpmc_subscriber_entry_t; + + +typedef struct pubsub_msg{ + pubsub_msg_header_t *header; + char* payload; + unsigned int payloadSize; +} pubsub_msg_t; + +static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props); +static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg); +static void* psa_udpmc_recvThread(void * data); + + +pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, + const char *scope, + const char *topic, + const char *ifIP, + long serializerSvcId) { + pubsub_updmc_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); + receiver->ctx = ctx; + receiver->scope = strndup(scope, 1024 * 1024); + receiver->topic = strndup(topic, 1024 * 1024); + receiver->ifIpAddress = strndup(ifIP, 1024 * 1024); + receiver->recvThread.running = true; + receiver->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); + receiver->topicEpollFd = epoll_create1(0); + + + celixThreadMutex_create(&receiver->serializer.mutex, NULL); + celixThreadMutex_create(&receiver->subscribers.mutex, NULL); + celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); + celixThreadMutex_create(&receiver->recvThread.mutex, NULL); + + receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + //track serializer svc based on the provided serializerSvcId + { + char filter[64]; + snprintf(filter, 64, "(service.id=%li)", serializerSvcId); + + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.filter = filter; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = receiver; + opts.setWithProperties = pubsub_udpmcTopicReceiver_setSerializer; + receiver->serializerTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + //track subscribers + { + int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + char buf[size+1]; + snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.ignoreServiceLanguage = true; + opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; + opts.filter.filter = buf; + opts.callbackHandle = receiver; + opts.addWithOwner = pubsub_udpmcTopicReceiver_addSubscriber; + opts.removeWithOwner = pubsub_udpmcTopicReceiver_removeSubscriber; + + receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + celixThread_create(&receiver->recvThread.thread, NULL, psa_udpmc_recvThread, receiver); + + return receiver; +} + +void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver) { + if (receiver != NULL) { + celix_bundleContext_stopTracker(receiver->ctx, receiver->serializerTrackerId); + celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); + + celixThreadMutex_lock(&receiver->recvThread.mutex); + receiver->recvThread.running = false; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + celixThread_join(receiver->recvThread.thread, NULL); + + celixThreadMutex_destroy(&receiver->serializer.mutex); + celixThreadMutex_destroy(&receiver->subscribers.mutex); + celixThreadMutex_destroy(&receiver->requestedConnections.mutex); + celixThreadMutex_destroy(&receiver->recvThread.mutex); + + largeUdp_destroy(receiver->largeUdpHandle); + //TODO cleanup entries, free map + + //TODO clean up requested connections map + } + free(receiver); +} + +const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver) { + return PSA_UDPMC_PUBSUB_ADMIN_TYPE; +} + +const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver) { + const char *result = NULL; + celixThreadMutex_lock(&receiver->serializer.mutex); + if (receiver->serializer.props != NULL) { + result = celix_properties_get(receiver->serializer.props, PUBSUB_SERIALIZER_TYPE_KEY, NULL); + } + celixThreadMutex_unlock(&receiver->serializer.mutex); + return result; +} + +const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver) { + return receiver->scope; +} +const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver) { + return receiver->topic; +} + +void pubsub_udpmcTopicReceiver_connectTo( + pubsub_updmc_topic_receiver_t *receiver, + const char *socketAddress, + long socketPort) { + printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort); + + int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort); + char *key = calloc((size_t)len+1, sizeof(char)); + snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_udpmc_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key); + if (entry == NULL) { + entry = calloc(1, sizeof(*entry)); + entry->key = key; + entry->socketAddress = strndup(socketAddress, 1024 * 1024); + entry->socketPort = socketPort; + entry->connected = false; + hashMap_put(receiver->requestedConnections.map, (void*)entry->key, entry); + } else { + free(key); + } + if (!entry->connected) { + int rc = 0; + entry->recvSocket = socket(AF_INET, SOCK_DGRAM, 0); + if (entry->recvSocket >= 0) { + int reuse = 1; + rc = setsockopt(entry->recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)); + } + if (entry->recvSocket >= 0 && rc >= 0) { + struct ip_mreq mc_addr; + mc_addr.imr_multiaddr.s_addr = inet_addr(entry->socketAddress); + mc_addr.imr_interface.s_addr = inet_addr(receiver->ifIpAddress); + printf("Adding MC %s at interface %s\n", entry->socketAddress, receiver->ifIpAddress); + rc = setsockopt(entry->recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr)); + } + if (entry->recvSocket >= 0 && rc >= 0) { + struct sockaddr_in mcListenAddr; + mcListenAddr.sin_family = AF_INET; + mcListenAddr.sin_addr.s_addr = INADDR_ANY; + mcListenAddr.sin_port = htons((uint16_t )entry->socketPort); + rc = bind(entry->recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)); + } + if (entry->recvSocket >= 0 && rc >= 0) { + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.fd = entry->recvSocket; + rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_ADD, entry->recvSocket, &ev); + } + + if (entry->recvSocket < 0 || rc < 0) { + fprintf(stderr, "[PSA UDPMC] Error connecting TopicReceiver %s/%s to %s:%li. (%s)\n", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno)); + } else { + entry->connected = true; + } + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); +} + +void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) { + printf("[PSA UDPMC] TopicReceiver %s/%s disconnect from socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort); + + int len = snprintf(NULL, 0, "%s:%li", socketAddress, socketPort); + char *key = calloc((size_t)len+1, sizeof(char)); + snprintf(key, (size_t)len+1, "%s:%li", socketAddress, socketPort); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_udpmc_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, key); + free(key); + if (entry != NULL && entry->connected) { + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + int rc = epoll_ctl(receiver->topicEpollFd, EPOLL_CTL_DEL, entry->recvSocket, &ev); + if (rc < 0) { + fprintf(stderr, "[PSA UDPMC] Error disconnecting TopicReceiver %s/%s to %s:%li.\n%s", receiver->scope, receiver->topic, socketAddress, socketPort, strerror(errno)); + } + } + if (entry != NULL) { + free(entry->key); + free(entry->socketAddress); + free(entry); + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); +} + + +static void pubsub_udpmcTopicReceiver_setSerializer(void *handle, void *svc, const celix_properties_t *props) { + pubsub_updmc_topic_receiver_t *receiver = handle; + pubsub_serializer_service_t *ser = svc; + + if (ser == NULL) { + //TODO -> no serializer -> remove all publishers + } + + celixThreadMutex_lock(&receiver->serializer.mutex); + receiver->serializer.svc = ser; + receiver->serializer.props = props; + celixThreadMutex_unlock(&receiver->serializer.mutex); +} + +static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_updmc_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { + //not the same scope. ignore + return; + } + + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); + if (entry != NULL) { + entry->usageCount += 1; + } else { + //new create entry + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->svc = svc; + + celixThreadMutex_lock(&receiver->serializer.mutex); + if (receiver->serializer.svc != NULL) { + receiver->serializer.svc->createSerializerMap(receiver->serializer.svc->handle, (celix_bundle_t*)bnd, &entry->msgTypes); + } else { + fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic); + } + celixThreadMutex_unlock(&receiver->serializer.mutex); + + hashMap_put(receiver->subscribers.map, (void*)bndId, entry); + + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_updmc_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_udpmc_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); + if (entry != NULL) { + entry->usageCount -= 1; + } + if (entry != NULL && entry->usageCount <= 0) { + //remove entry + hashMap_remove(receiver->subscribers.map, (void*)bndId); + celixThreadMutex_lock(&receiver->serializer.mutex); + if (receiver->serializer.svc != NULL) { + receiver->serializer.svc->destroySerializerMap(receiver->serializer.svc->handle, entry->msgTypes); + } else { + fprintf(stderr, "Cannot find serializer for TopicReceiver %s/%s", receiver->scope, receiver->topic); + } + celixThreadMutex_unlock(&receiver->serializer.mutex); + free(entry); + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void* psa_udpmc_recvThread(void * data) { + pubsub_updmc_topic_receiver_t *receiver = data; + + struct epoll_event events[MAX_EPOLL_EVENTS]; + + celixThreadMutex_lock(&receiver->recvThread.mutex); + bool running = receiver->recvThread.running; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + + while (running) { + int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); + int i; + for(i = 0; i < nfds; i++ ) { + unsigned int index; + unsigned int size; + if(largeUdp_dataAvailable(receiver->largeUdpHandle, events[i].data.fd, &index, &size) == true) { + // Handle data + pubsub_udp_msg_t *udpMsg = NULL; + if(largeUdp_read(receiver->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { + printf("[PSA_UDPMC]: ERROR largeUdp_read with index %d\n", index); + continue; + } + + psa_udpmc_processMsg(receiver, udpMsg); + + free(udpMsg); + } + } + celixThreadMutex_lock(&receiver->recvThread.mutex); + running = receiver->recvThread.running; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + } + + return NULL; +} + +static void psa_udpmc_processMsg(pubsub_updmc_topic_receiver_t *receiver, pubsub_udp_msg_t *msg) { + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_udpmc_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + + pubsub_msg_serializer_t *msgSer = NULL; + if (entry->msgTypes != NULL) { + msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type); + } + if (msgSer == NULL) { + printf("[PSA_UDPMC] Serializer not available for message %d.\n",msg->header.type); + } else{ + void *msgInst = NULL; + bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header); + + if(validVersion){ + + celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst); + + if (status == CELIX_SUCCESS) { + bool release = true; + pubsub_multipart_callbacks_t mp_callbacks; + mp_callbacks.handle = receiver; + mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType; + mp_callbacks.getMultipart = NULL; + + pubsub_subscriber_t *svc = entry->svc; + svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release); + + if(release){ + msgSer->freeMsg(msgSer,msgInst); + } + } + else{ + printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName); + } + + } + else{ + int major=0,minor=0; + version_getMajor(msgSer->msgVersion,&major); + version_getMinor(msgSer->msgVersion,&minor); + printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName,major,minor,msg->header.major,msg->header.minor); + } + + } + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h new file mode 100644 index 0000000..ee2c113 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h @@ -0,0 +1,45 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +#ifndef CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H +#define CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H + +#include "celix_bundle_context.h" + +typedef struct pubsub_updmc_topic_receiver pubsub_updmc_topic_receiver_t; + +pubsub_updmc_topic_receiver_t* pubsub_udpmcTopicReceiver_create(celix_bundle_context_t *ctx, + const char *scope, + const char *topic, + const char *ifIP, + long serializerSvcId); +void pubsub_udpmcTopicReceiver_destroy(pubsub_updmc_topic_receiver_t *receiver); + +const char* pubsub_udpmcTopicReceiver_psaType(pubsub_updmc_topic_receiver_t *receiver); +const char* pubsub_udpmcTopicReceiver_serializerType(pubsub_updmc_topic_receiver_t *receiver); +const char* pubsub_udpmcTopicReceiver_scope(pubsub_updmc_topic_receiver_t *receiver); +const char* pubsub_udpmcTopicReceiver_topic(pubsub_updmc_topic_receiver_t *receiver); +const char* pubsub_udpmcTopicReceiver_socketAddress(pubsub_updmc_topic_receiver_t *receiver); + +void pubsub_udpmcTopicReceiver_connectTo( + pubsub_updmc_topic_receiver_t *receiver, + const char *socketAddress, + long socketPort); +void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_updmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort); + +#endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c index 5553d3b..4a6d027 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c @@ -1,17 +1,52 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + #include <pubsub_serializer.h> #include <stdlib.h> #include <memory.h> #include <pubsub_constants.h> +#include <pubsub/publisher.h> +#include <utils.h> +#include <pubsub_common.h> +#include <zconf.h> +#include <arpa/inet.h> #include "pubsub_udpmc_topic_sender.h" #include "pubsub_psa_udpmc_constants.h" +#include "large_udp.h" +#include "pubsub_udpmc_common.h" + +#define FIRST_SEND_DELAY_IN_SECONDS 2 + +//TODO make configurable +#define UDP_BASE_PORT 49152 +#define UDP_MAX_PORT 65000 -static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props); struct pubsub_updmc_topic_sender { celix_bundle_context_t *ctx; char *scope; char *topic; char *socketAddress; + long socketPort; + + int sendSocket; + struct sockaddr_in destAddr; long serTrackerId; struct { @@ -19,26 +54,95 @@ struct pubsub_updmc_topic_sender { pubsub_serializer_service_t *svc; const celix_properties_t *props; } serializer; + + struct { + long svcId; + celix_service_factory_t factory; + } publisher; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bndId, value = psa_udpmc_bounded_service_entry_t + } boundedServices; }; -pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, long serializerSvcId) { +typedef struct psa_udpmc_bounded_service_entry { + pubsub_updmc_topic_sender_t *parent; + pubsub_publisher_t service; + long bndId; + hash_map_t *msgTypes; + int getCount; + largeUdp_pt largeUdpHandle; +} psa_udpmc_bounded_service_entry_t; + +typedef struct pubsub_msg{ + pubsub_msg_header_t *header; + char* payload; + unsigned int payloadSize; +} pubsub_msg_t; + +static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props); +static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties); +static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties); +static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg); +static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback); +static unsigned int rand_range(unsigned int min, unsigned int max); + +pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create( + celix_bundle_context_t *ctx, + const char *scope, + const char *topic, + long serializerSvcId, + int sendSocket, + const char *bindIP) { pubsub_updmc_topic_sender_t *sender = calloc(1, sizeof(*sender)); sender->ctx = ctx; sender->scope = strndup(scope, 1024 * 1024); sender->topic = strndup(topic, 1024 * 1024); celixThreadMutex_create(&sender->serializer.mutex, NULL); + celixThreadMutex_create(&sender->boundedServices.mutex, NULL); + sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); + + //setting up socket for UDPMC TopicSender + { + unsigned int port = rand_range(UDP_BASE_PORT, UDP_MAX_PORT); + sender->sendSocket = sendSocket; + sender->destAddr.sin_family = AF_INET; + sender->destAddr.sin_addr.s_addr = inet_addr(bindIP); + sender->destAddr.sin_port = htons((uint16_t)port); + + sender->socketAddress = strndup(bindIP, 1024); + sender->socketPort = port; + } + + //track serializer svc based on the provided serializerSvcId + { + char filter[64]; + snprintf(filter, 64, "(service.id=%li)", serializerSvcId); + + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.filter = filter; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = sender; + opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer; + sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + //register publisher services using a service factory + { + sender->publisher.factory.handle = sender; + sender->publisher.factory.getService = psa_udpmc_getPublisherService; + sender->publisher.factory.ungetService = psa_udpmc_ungetPublisherService; - char filter[64]; - snprintf(filter, 64, "(service.id=%li)", serializerSvcId); + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); - celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; - opts.filter.filter = filter; - opts.filter.ignoreServiceLanguage = true; - opts.callbackHandle = sender; - opts.setWithProperties = pubsub_udpmcTopicSender_setSerializer; - sender->serTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + + sender->publisher.svcId = celix_bundleContext_registerServiceFactory(ctx, &sender->publisher.factory, PUBSUB_PUBLISHER_SERVICE_NAME, props); + } return sender; } @@ -46,11 +150,17 @@ pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender) { if (sender != NULL) { celix_bundleContext_stopTracker(sender->ctx, sender->serTrackerId); + celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId); celixThreadMutex_destroy(&sender->serializer.mutex); + celixThreadMutex_destroy(&sender->boundedServices.mutex); + + //TODO loop and cleanup? + hashMap_destroy(sender->boundedServices.map, false, true); free(sender->scope); free(sender->topic); + free(sender->socketAddress); free(sender); } } @@ -81,8 +191,12 @@ const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *s return sender->socketAddress; } +long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender) { + return sender->socketPort; +} + void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) { - //TODO + //TODO subscriber count -> topic info } void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint) { @@ -92,8 +206,173 @@ void pubsub_udpmcTopicSender_disconnectFrom(pubsub_updmc_topic_sender_t *sender, static void pubsub_udpmcTopicSender_setSerializer(void *handle, void *svc, const celix_properties_t *props) { pubsub_updmc_topic_sender_t *sender = handle; pubsub_serializer_service_t *ser = svc; + + if (ser == NULL) { + //TODO -> no serializer -> remove all publishers + } + celixThreadMutex_lock(&sender->serializer.mutex); sender->serializer.svc = ser; sender->serializer.props = props; celixThreadMutex_unlock(&sender->serializer.mutex); -} \ No newline at end of file +} + +static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { + pubsub_updmc_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId); + if (entry != NULL) { + entry->getCount += 1; + } else { + entry = calloc(1, sizeof(*entry)); + entry->getCount = 1; + entry->parent = sender; + entry->bndId = bndId; + entry->largeUdpHandle = largeUdp_create(1); + + celixThreadMutex_lock(&sender->serializer.mutex); + celix_status_t rc = CELIX_SUCCESS; + if (sender->serializer.svc != NULL) { + rc = sender->serializer.svc->createSerializerMap(sender->serializer.svc->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes); + } + if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) { + //TODO destroy and return NULL? + fprintf(stderr, "Error creating publisher service, serializer not available / cannot get msg serializer map\n"); + } + celixThreadMutex_unlock(&sender->serializer.mutex); + + + entry->service.handle = entry; + entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType; + entry->service.send = psa_udpmc_topicPublicationSend; + entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP + + hashMap_put(sender->boundedServices.map, (void*)bndId, entry); + } + celixThreadMutex_unlock(&sender->boundedServices.mutex); + + return &entry->service; +} + +static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { + pubsub_updmc_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_udpmc_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId); + if (entry != NULL) { + entry->getCount -= 1; + } + if (entry != NULL && entry->getCount == 0) { + //free entry + hashMap_remove(sender->boundedServices.map, (void*)bndId); + + + celixThreadMutex_lock(&sender->serializer.mutex); + celix_status_t rc = CELIX_SUCCESS; + if (sender->serializer.svc != NULL) { + rc = sender->serializer.svc->destroySerializerMap(sender->serializer.svc->handle, entry->msgTypes); + } + if (sender->serializer.svc == NULL || rc != CELIX_SUCCESS) { + fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + } + celixThreadMutex_unlock(&sender->serializer.mutex); + + + largeUdp_destroy(entry->largeUdpHandle); + free(entry); + } + celixThreadMutex_unlock(&sender->boundedServices.mutex); +} + +static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) { + psa_udpmc_bounded_service_entry_t *entry = handle; + int status = 0; + + pubsub_msg_serializer_t* msgSer = NULL; + if (entry->msgTypes != NULL) { + msgSer = hashMap_get(entry->msgTypes, (void*)(intptr_t)(msgTypeId)); + } + + if (msgSer != NULL) { + int major=0, minor=0; + + pubsub_msg_header_t *msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); + strncpy(msg_hdr->topic,entry->parent->topic,MAX_TOPIC_LEN-1); + msg_hdr->type = msgTypeId; + + + if (msgSer->msgVersion != NULL){ + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + msg_hdr->major = major; + msg_hdr->minor = minor; + } + + void* serializedOutput = NULL; + size_t serializedOutputLen = 0; + msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); + + pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t)); + msg->header = msg_hdr; + msg->payload = (char*)serializedOutput; + msg->payloadSize = serializedOutputLen; + + + if(psa_udpmc_sendMsg(entry, msg,true, NULL) == false) { + status = -1; + } + free(msg_hdr); + free(msg); + free(serializedOutput); + + + } else { + printf("[PSA_UDPMC/TopicSender] No msg serializer available for msg type id %d\n", msgTypeId); + status=-1; + } + return status; +} + +static void delay_first_send_for_late_joiners(){ + + static bool firstSend = true; + + if(firstSend){ + printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY_IN_SECONDS); + firstSend = false; + } +} + +static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) { + const int iovec_len = 3; // header + size + payload + bool ret = true; + + struct iovec msg_iovec[iovec_len]; + msg_iovec[0].iov_base = msg->header; + msg_iovec[0].iov_len = sizeof(*msg->header); + msg_iovec[1].iov_base = &msg->payloadSize; + msg_iovec[1].iov_len = sizeof(msg->payloadSize); + msg_iovec[2].iov_base = msg->payload; + msg_iovec[2].iov_len = msg->payloadSize; + + delay_first_send_for_late_joiners(); + + if(largeUdp_sendmsg(entry->largeUdpHandle, entry->parent->sendSocket, msg_iovec, iovec_len, 0, &entry->parent->destAddr, sizeof(entry->parent->destAddr)) == -1) { + perror("send_pubsub_msg:sendSocket"); + ret = false; + } + + if(releaseCallback) { + releaseCallback->release(msg->payload, entry); + } + return ret; +} + +static unsigned int rand_range(unsigned int min, unsigned int max){ + double scaled = ((double)random())/((double)RAND_MAX); + return (unsigned int)((max-min+1)*scaled + min); +} http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h index 38a9127..89e56ec 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h @@ -23,7 +23,13 @@ typedef struct pubsub_updmc_topic_sender pubsub_updmc_topic_sender_t; -pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create(celix_bundle_context_t *ctx, /*TODO rest args*/ const char *scope, const char *topic, long serializerSvcId); +pubsub_updmc_topic_sender_t* pubsub_udpmcTopicSender_create( + celix_bundle_context_t *ctx, + const char *scope, + const char *topic, + long serializerSvcId, + int sendSocket, + const char *bindIP); void pubsub_udpmcTopicSender_destroy(pubsub_updmc_topic_sender_t *sender); const char* pubsub_udpmcTopicSender_psaType(pubsub_updmc_topic_sender_t *sender); @@ -31,6 +37,8 @@ const char* pubsub_udpmcTopicSender_serializerType(pubsub_updmc_topic_sender_t * const char* pubsub_udpmcTopicSender_scope(pubsub_updmc_topic_sender_t *sender); const char* pubsub_udpmcTopicSender_topic(pubsub_updmc_topic_sender_t *sender); const char* pubsub_udpmcTopicSender_socketAddress(pubsub_updmc_topic_sender_t *sender); +long pubsub_udpmcTopicSender_socketPort(pubsub_updmc_topic_sender_t *sender); + //TODO connections etc void pubsub_udpmcTopicSender_connectTo(pubsub_updmc_topic_sender_t *sender, const celix_properties_t *endpoint); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c index eea8460..2fc80ee 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c @@ -76,7 +76,7 @@ typedef struct publish_bundle_bound_service { typedef struct pubsub_msg{ - pubsub_msg_header_pt header; + pubsub_msg_header_t *header; char* payload; unsigned int payloadSize; } pubsub_msg_t; http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c index 46a1688..387935e 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c @@ -110,7 +110,7 @@ typedef struct msg_map_entry{ static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); static void* zmq_recv_thread_func(void* arg); -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); +static bool checkVersion(version_pt msgVersion,receiver hdr); static void sigusr1_sighandler(int signo); static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/psd_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c index 8b2a2d4..81e467d 100644 --- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c +++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c @@ -29,6 +29,7 @@ #include "pubsub_common.h" #include "pubsub_listeners.h" #include "pubsub_discovery_impl.h" +#include "../../../shell/shell/include/command.h" typedef struct psd_activator { pubsub_discovery_t *pubsub_discovery; @@ -38,6 +39,9 @@ typedef struct psd_activator { pubsub_announce_endpoint_listener_t listenerSvc; long listenerSvcId; + + command_service_t cmdSvc; + long cmdSvcId; } psd_activator_t; static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ctx) { @@ -48,7 +52,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct status = pubsub_discovery_start(act->pubsub_discovery); celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.serviceName = PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE; + opts.filter.serviceName = PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE; opts.callbackHandle = act->pubsub_discovery; opts.addWithOwner = pubsub_discovery_discoveredEndpointsListenerAdded; opts.removeWithOwner = pubsub_discovery_discoveredEndpointsListenerRemoved; @@ -58,6 +62,18 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint; act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint; + //register shell command service + //register shell command + if (status == CELIX_SUCCESS) { + act->cmdSvc.handle = act->pubsub_discovery; + act->cmdSvc.executeCommand = pubsub_discovery_executeCommand; + celix_properties_t *props = celix_properties_create(); + properties_set(props, OSGI_SHELL_COMMAND_NAME, "psd_etcd"); + properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psd_etcd"); //TODO add search topic/scope option + properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Overview of discovered/announced endpoints from/to ETCD"); + act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props); + } + if (status == CELIX_SUCCESS) { act->listenerSvcId = celix_bundleContext_registerService(ctx, &act->listenerSvc, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, NULL); } else { @@ -70,6 +86,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct static celix_status_t psd_stop(psd_activator_t *act, celix_bundle_context_t *ctx) { celix_bundleContext_stopTracker(ctx, act->publishAnnounceSvcTrackerId); celix_bundleContext_unregisterService(ctx, act->listenerSvcId); + celix_bundleContext_unregisterService(ctx, act->cmdSvcId); celix_status_t status = pubsub_discovery_stop(act->pubsub_discovery); pubsub_discovery_destroy(act->pubsub_discovery); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index 98d6be6..237f2f2 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -63,8 +63,11 @@ celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discove celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsListenersMutex, NULL); celixThreadMutex_create(&(*ps_discovery)->announcedEndpointsMutex, NULL); celixThreadMutex_create(&(*ps_discovery)->discoveredEndpointsMutex, NULL); - celixThreadMutex_create(&(*ps_discovery)->waitMutex, NULL); - celixThreadCondition_init(&(*ps_discovery)->waitCond, NULL); + pthread_mutex_init(&(*ps_discovery)->waitMutex, NULL); + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + pthread_cond_init(&(*ps_discovery)->waitCond, &attr); celixThreadMutex_create(&(*ps_discovery)->runningMutex, NULL); (*ps_discovery)->running = true; @@ -102,8 +105,8 @@ celix_status_t pubsub_discovery_destroy(pubsub_discovery_t *ps_discovery) { celixThreadMutex_unlock(&ps_discovery->announcedEndpointsMutex); celixThreadMutex_destroy(&ps_discovery->announcedEndpointsMutex); - celixThreadMutex_destroy(&ps_discovery->waitMutex); - celixThreadCondition_destroy(&ps_discovery->waitCond); + pthread_mutex_destroy(&ps_discovery->waitMutex); + pthread_cond_destroy(&ps_discovery->waitCond); celixThreadMutex_destroy(&ps_discovery->runningMutex); @@ -246,7 +249,7 @@ void* psd_refresh(void *data) { while (running) { struct timespec start; - clock_gettime(CLOCK_REALTIME, &start); + clock_gettime(CLOCK_MONOTONIC, &start); celixThreadMutex_lock(&disc->announcedEndpointsMutex); hash_map_iterator_t iter = hashMapIterator_construct(disc->announcedEndpoints); @@ -273,9 +276,9 @@ void* psd_refresh(void *data) { struct timespec waitTill = start; waitTill.tv_sec += disc->sleepInsecBetweenTTLRefresh; - celixThreadMutex_lock(&disc->waitMutex); - pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread - celixThreadMutex_unlock(&disc->waitMutex); + pthread_mutex_lock(&disc->waitMutex); + pthread_cond_timedwait(&disc->waitCond, &disc->waitMutex, &waitTill); //TODO add timedwait abs for celixThread (including MONOTONIC ..) + pthread_mutex_unlock(&disc->waitMutex); celixThreadMutex_lock(&disc->runningMutex); running = disc->running; @@ -438,9 +441,9 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel assert(fwUUID != NULL); if (fwUUID != NULL && strncmp(disc->fwUUID, fwUUID, strlen(disc->fwUUID)) == 0) { - if (disc->verbose) { - printf("[PSD] Ignoring endpoint %s from own framework\n", uuid); - } +// if (disc->verbose) { +// printf("[PSD] Ignoring endpoint %s from own framework\n", uuid); +// } return; } @@ -553,3 +556,10 @@ static char* pubsub_discovery_createJsonEndpoint(const celix_properties_t *props json_decref(jsEndpoint); return str; } + +celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) { + //pubsub_discovery_t *psd = handle; + //TODO + fprintf(os, "TODO\n"); + return CELIX_SUCCESS; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h index f0a5b22..a1af837 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h @@ -56,8 +56,9 @@ typedef struct pubsub_discovery { celix_thread_mutex_t discoveredEndpointsListenersMutex; hash_map_pt discoveredEndpointsListeners; //key=svcId, value=pubsub_discovered_endpoint_listener_t - celix_thread_mutex_t waitMutex; - celix_thread_cond_t waitCond; + //NOTE using pthread instead of celix mutex/cond so that condwait with abs time using a MONOTONIC clock can be used + pthread_mutex_t waitMutex; + pthread_cond_t waitCond; celix_thread_mutex_t runningMutex; bool running; @@ -91,4 +92,7 @@ void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint); celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint); +celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream); + + #endif /* PUBSUB_DISCOVERY_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h index 8c15fcf..9157861 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h @@ -46,7 +46,7 @@ struct pubsub_admin_service { celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId); celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId); - celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, double *score); + celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match); //note endpoint is owned by caller celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_common.h b/bundles/pubsub/pubsub_spi/include/pubsub_common.h index e551031..3231400 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_common.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_common.h @@ -40,7 +40,7 @@ struct pubsub_msg_header{ unsigned char minor; }; -typedef struct pubsub_msg_header* pubsub_msg_header_pt; +typedef struct pubsub_msg_header pubsub_msg_header_t; #endif /* PUBSUB_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/include/pubsub_utils.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h index 14f9bb0..66cc44a 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h @@ -60,13 +60,10 @@ double pubsub_utils_matchSubscriber( double defaultScore, long *outSerializerSvcId); -double pubsub_utils_matchEndpoint( +bool pubsub_utils_matchEndpoint( celix_bundle_context_t *ctx, const celix_properties_t *endpoint, const char *adminType, - double sampleScore, - double controlScore, - double defaultScore, long *outSerializerSvcId); http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index 0b3c742..e10169f 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -85,7 +85,14 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, } } -celix_properties_t* pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, const char* pubsubType, const char* adminType, const char *serType, celix_properties_t *topic_props) { +celix_properties_t* pubsubEndpoint_create( + const char* fwUUID, + const char* scope, + const char* topic, + const char* pubsubType, + const char* adminType, + const char *serType, + celix_properties_t *topic_props) { celix_properties_t *ep = properties_create(); pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props); if (!pubsubEndpoint_isValid(ep, true, true)) { http://git-wip-us.apache.org/repos/asf/celix/blob/2eb219e3/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c index 60f7883..c21d597 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c @@ -183,7 +183,7 @@ double pubsub_utils_matchSubscriber( score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match } - printf("Score subscriber service match for psa type %s is %f", adminType, score); + printf("Score subscriber service match for psa type %s is %f\n", adminType, score); if (outSerializerSvcId != NULL) { *outSerializerSvcId = serializerSvcId; @@ -196,35 +196,32 @@ double pubsub_utils_matchSubscriber( return score; } -double pubsub_utils_matchEndpoint( +bool pubsub_utils_matchEndpoint( celix_bundle_context_t *ctx, const celix_properties_t *ep, const char *adminType, - double sampleScore, - double controlScore, - double defaultScore, long *outSerializerSvcId) { - const char *requested_admin = NULL; - const char *requested_qos = NULL; - if (ep != NULL) { - requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL); - requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL); + bool psaMatch = false; + const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL); + if (configured_admin != NULL) { + psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0; } - double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore); - - const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL); - long serializerSvcId = getPSASerializer(ctx, requested_serializer); - if (serializerSvcId < 0) { - score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match + bool serMatch = false; + long serializerSvcId = -1L; + if (psaMatch) { + const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL); + serializerSvcId = getPSASerializer(ctx, configured_serializer); + serMatch = serializerSvcId >= 0; } - printf("Score endpoint match for psa type %s is %f", adminType, score); + bool match = psaMatch && serMatch; + printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false"); if (outSerializerSvcId != NULL) { *outSerializerSvcId = serializerSvcId; } - return score; + return match; } \ No newline at end of file