http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c new file mode 100644 index 0000000..6d0768b --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c @@ -0,0 +1,635 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_subscription.c + * + * \date Oct 2, 2015 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/epoll.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "utils.h" +#include "celix_errno.h" +#include "constants.h" +#include "version.h" + +#include "topic_subscription.h" +#include "topic_publication.h" +#include "pubsub/subscriber.h" +#include "pubsub/publisher.h" +#include "large_udp.h" + +#include "pubsub_serializer.h" + +#define MAX_EPOLL_EVENTS 10 +#define RECV_THREAD_TIMEOUT 5 +#define UDP_BUFFER_SIZE 65535 +#define MAX_UDP_SESSIONS 16 + +struct topic_subscription{ + char* ifIpAddress; + service_tracker_pt tracker; + array_list_pt sub_ep_list; + celix_thread_t recv_thread; + bool running; + celix_thread_mutex_t ts_lock; + bundle_context_pt context; + + pubsub_serializer_service_t *serializer; + + int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. + hash_map_pt servicesMap; // key = service, value = msg types map + hash_map_pt socketMap; // key = URL, value = listen-socket + celix_thread_mutex_t socketMap_lock; + + celix_thread_mutex_t pendingConnections_lock; + array_list_pt pendingConnections; + + array_list_pt pendingDisconnections; + celix_thread_mutex_t pendingDisconnections_lock; + + //array_list_pt rawServices; + unsigned int nrSubscribers; + largeUdp_pt largeUdpHandle; +}; + +typedef struct msg_map_entry{ + bool retain; + void* msgInst; +}* msg_map_entry_pt; + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); +static void* udp_recv_thread_func(void* arg); +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); +static void sigusr1_sighandler(int signo); +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); +static void connectPendingPublishers(topic_subscription_pt sub); +static void disconnectPendingPublishers(topic_subscription_pt sub); + + +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ + celix_status_t status = CELIX_SUCCESS; + + topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); + ts->context = bundle_context; + ts->ifIpAddress = strdup(ifIp); +#if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX +#else + ts->topicEpollFd = epoll_create1(0); +#endif + if(ts->topicEpollFd == -1) { + status += CELIX_SERVICE_EXCEPTION; + } + + ts->running = false; + ts->nrSubscribers = 0; + ts->serializer = best_serializer; + + celixThreadMutex_create(&ts->ts_lock,NULL); + arrayList_create(&ts->sub_ep_list); + ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + arrayList_create(&ts->pendingConnections); + arrayList_create(&ts->pendingDisconnections); + celixThreadMutex_create(&ts->pendingConnections_lock, NULL); + celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); + celixThreadMutex_create(&ts->socketMap_lock, NULL); + + ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); + + char filter[128]; + memset(filter,0,128); + if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { + // default scope, means that subscriber has not defined a scope property + snprintf(filter, 128, "(&(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic); + + } else { + snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic, + PUBSUB_SUBSCRIBER_SCOPE,scope); + } + + service_tracker_customizer_pt customizer = NULL; + status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); + status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); + + struct sigaction actions; + memset(&actions, 0, sizeof(actions)); + sigemptyset(&actions.sa_mask); + actions.sa_flags = 0; + actions.sa_handler = sigusr1_sighandler; + + sigaction(SIGUSR1,&actions,NULL); + + if (status == CELIX_SUCCESS) { + *out=ts; + } + + return status; +} + +celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + ts->running = false; + free(ts->ifIpAddress); + serviceTracker_destroy(ts->tracker); + arrayList_clear(ts->sub_ep_list); + arrayList_destroy(ts->sub_ep_list); + hashMap_destroy(ts->servicesMap,false,false); + + celixThreadMutex_lock(&ts->socketMap_lock); + hashMap_destroy(ts->socketMap,true,true); + celixThreadMutex_unlock(&ts->socketMap_lock); + celixThreadMutex_destroy(&ts->socketMap_lock); + + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_destroy(ts->pendingConnections); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + celixThreadMutex_destroy(&ts->pendingConnections_lock); + + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_destroy(ts->pendingDisconnections); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + celixThreadMutex_destroy(&ts->pendingDisconnections_lock); + + largeUdp_destroy(ts->largeUdpHandle); +#if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX +#else + close(ts->topicEpollFd); +#endif + + celixThreadMutex_unlock(&ts->ts_lock); + + celixThreadMutex_destroy(&ts->ts_lock); + + free(ts); + + return status; +} + +celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + status = serviceTracker_open(ts->tracker); + + ts->running = true; + + if(status==CELIX_SUCCESS){ + status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts); + } + + return status; +} + +celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + + ts->running = false; + + pthread_kill(ts->recv_thread.thread,SIGUSR1); + + celixThread_join(ts->recv_thread,NULL); + + status = serviceTracker_close(ts->tracker); + + celixThreadMutex_lock(&ts->socketMap_lock); + hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap); + while(hashMapIterator_hasNext(it)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(it); + char *url = hashMapEntry_getKey(entry); + int *s = hashMapEntry_getValue(entry); + memset(&ev, 0, sizeof(ev)); + if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) { + printf("in if error()\n"); + perror("epoll_ctl() EPOLL_CTL_DEL"); + status += CELIX_SERVICE_EXCEPTION; + } + free(s); + free(url); + //hashMapIterator_remove(it); + } + hashMapIterator_destroy(it); + hashMap_clear(ts->socketMap, false, false); + celixThreadMutex_unlock(&ts->socketMap_lock); + + + return status; +} + +celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) { + + printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL); + + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->socketMap_lock); + + if(!hashMap_containsKey(ts->socketMap, pubURL)){ + + int *recvSocket = calloc(sizeof(int), 1); + *recvSocket = socket(AF_INET, SOCK_DGRAM, 0); + if (*recvSocket < 0) { + perror("pubsub_topicSubscriptionCreate:socket"); + status = CELIX_SERVICE_EXCEPTION; + } + + if (status == CELIX_SUCCESS){ + int reuse = 1; + if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { + perror("setsockopt() SO_REUSEADDR"); + status = CELIX_SERVICE_EXCEPTION; + } + } + + if(status == CELIX_SUCCESS){ + // TODO Check if there is a better way to parse the URL to IP/Portnr + //replace ':' by spaces + char *url = strdup(pubURL); + char *pt = url; + while((pt=strchr(pt, ':')) != NULL) { + *pt = ' '; + } + char mcIp[100]; + unsigned short mcPort; + sscanf(url, "udp //%s %hu", mcIp, &mcPort); + free(url); + + printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); + + struct ip_mreq mc_addr; + mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); + mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); + printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); + if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { + perror("setsockopt() IP_ADD_MEMBERSHIP"); + status = CELIX_SERVICE_EXCEPTION; + } + + if (status == CELIX_SUCCESS){ + struct sockaddr_in mcListenAddr; + mcListenAddr.sin_family = AF_INET; + mcListenAddr.sin_addr.s_addr = INADDR_ANY; + mcListenAddr.sin_port = htons(mcPort); + if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { + perror("bind()"); + status = CELIX_SERVICE_EXCEPTION; + } + } + + if (status == CELIX_SUCCESS){ +#if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX +#else + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.fd = *recvSocket; + if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { + perror("epoll_ctl() EPOLL_CTL_ADD"); + status = CELIX_SERVICE_EXCEPTION; + } +#endif + } + + } + + if (status == CELIX_SUCCESS){ + hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket); + } + else{ + free(recvSocket); + } + } + + celixThreadMutex_unlock(&ts->socketMap_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_add(ts->pendingConnections, url); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_add(ts->pendingDisconnections, url); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ + printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL); + celix_status_t status = CELIX_SUCCESS; + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + + celixThreadMutex_lock(&ts->socketMap_lock); + + if (hashMap_containsKey(ts->socketMap, pubURL)){ + +#if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX +#else + int *s = hashMap_remove(ts->socketMap, pubURL); + if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) { + printf("in if error()\n"); + perror("epoll_ctl() EPOLL_CTL_DEL"); + status = CELIX_SERVICE_EXCEPTION; + } + free(s); +#endif + + } + + celixThreadMutex_unlock(&ts->socketMap_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + arrayList_add(ts->sub_ep_list,subEP); + celixThreadMutex_unlock(&ts->ts_lock); + + return status; + +} + +celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + ts->nrSubscribers++; + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + arrayList_removeElement(ts->sub_ep_list,subEP); + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + ts->nrSubscribers--; + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { + return ts->nrSubscribers; +} + +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ + return sub->sub_ep_list; +} + + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (!hashMap_containsKey(ts->servicesMap, service)) { + bundle_pt bundle = NULL; + hash_map_pt msgTypes = NULL; + + serviceReference_getBundle(reference, &bundle); + + if(ts->serializer != NULL && bundle!=NULL){ + ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); + if(msgTypes != NULL){ + hashMap_put(ts->servicesMap, service, msgTypes); + printf("PSA_UDP_MC_TS: New subscriber registered.\n"); + } + } + else{ + printf("PSA_UDP_MC_TS: Cannot register new subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } + celixThreadMutex_unlock(&ts->ts_lock); + + return status; + +} + +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); + if(msgTypes!=NULL && ts->serializer!=NULL){ + ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); + printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); + } + else{ + printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } + celixThreadMutex_unlock(&ts->ts_lock); + + printf("PSA_UDP_MC_TS: Subscriber unregistered.\n"); + return status; +} + + +static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){ + + celixThreadMutex_lock(&sub->ts_lock); + hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); + hash_map_pt msgTypes = hashMapEntry_getValue(entry); + + pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type); + if (msgSer == NULL) { + printf("PSA_UDP_MC_TS: Serializer not available for message %d.\n",msg->header.type); + } + else{ + void *msgInst = NULL; + bool validVersion = checkVersion(msgSer->msgVersion,&msg->header); + + if(validVersion){ + + celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst); + + if (status == CELIX_SUCCESS) { + bool release = true; + pubsub_multipart_callbacks_t mp_callbacks; + mp_callbacks.handle = sub; + mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; + mp_callbacks.getMultipart = NULL; + + subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release); + + if(release){ + msgSer->freeMsg(msgSer,msgInst); + } + } + else{ + printf("PSA_UDP_MC_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); + } + + } + else{ + int major=0,minor=0; + version_getMajor(msgSer->msgVersion,&major); + version_getMinor(msgSer->msgVersion,&minor); + printf("PSA_UDP_MC_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName,major,minor,msg->header.major,msg->header.minor); + } + + } + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&sub->ts_lock); +} + +static void* udp_recv_thread_func(void * arg) { + topic_subscription_pt sub = (topic_subscription_pt) arg; + +#if defined(__APPLE__) && defined(__MACH__) + //TODO: use kqueue for OSX + //struct kevent events[MAX_EPOLL_EVENTS]; + while (sub->running) { + int nfds = 0; + if(nfds > 0) { + pubsub_udp_msg_t* udpMsg = NULL; + process_msg(sub, udpMsg); + } + } +#else + struct epoll_event events[MAX_EPOLL_EVENTS]; + + while (sub->running) { + int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); + int i; + for(i = 0; i < nfds; i++ ) { + unsigned int index; + unsigned int size; + if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) { + // Handle data + pubsub_udp_msg_t *udpMsg = NULL; + if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { + printf("PSA_UDP_MC_TS: ERROR largeUdp_read with index %d\n", index); + continue; + } + + process_msg(sub, udpMsg); + + free(udpMsg); + } + } + connectPendingPublishers(sub); + disconnectPendingPublishers(sub); + } +#endif + + return NULL; +} + +static void connectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingConnections_lock); + while(!arrayList_isEmpty(sub->pendingConnections)) { + char * pubEP = arrayList_remove(sub->pendingConnections, 0); + pubsub_topicSubscriptionConnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingConnections_lock); +} + +static void disconnectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingDisconnections_lock); + while(!arrayList_isEmpty(sub->pendingDisconnections)) { + char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); + pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingDisconnections_lock); +} + +static void sigusr1_sighandler(int signo){ + printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n"); + return; +} + +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ + bool check=false; + int major=0,minor=0; + + if(msgVersion!=NULL){ + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ + check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + } + } + + return check; +} + +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; +}
http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h new file mode 100644 index 0000000..475416a --- /dev/null +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h @@ -0,0 +1,60 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_subscription.h + * + * \date Sep 22, 2015 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef TOPIC_SUBSCRIPTION_H_ +#define TOPIC_SUBSCRIPTION_H_ + +#include "celix_threads.h" +#include "array_list.h" +#include "celixbool.h" +#include "service_tracker.h" + +#include "pubsub_endpoint.h" +#include "pubsub_common.h" +#include "pubsub_serializer.h" + +typedef struct topic_subscription* topic_subscription_pt; + +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); +celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); +celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); + +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); + +celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); +celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); + +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); + +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); +celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); +celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); +unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription); + +#endif /*TOPIC_SUBSCRIPTION_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt new file mode 100644 index 0000000..ddc17eb --- /dev/null +++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +if (BUILD_PUBSUB_PSA_ZMQ) + + find_package(ZMQ REQUIRED) + find_package(CZMQ REQUIRED) + find_package(Jansson REQUIRED) + + if (BUILD_ZMQ_SECURITY) + add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1) + + find_package(OpenSSL 1.1.0 REQUIRED) + include_directories("${OPENSSL_INCLUDE_DIR}") + + set (ZMQ_CRYPTO_C "private/src/zmq_crypto.c") + endif() + + add_celix_bundle(celix_pubsub_admin_zmq + BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq" + VERSION "1.0.0" + SOURCES + src/psa_activator.c + src/pubsub_admin_impl.c + src/topic_subscription.c + src/topic_publication.c + ${ZMQ_CRYPTO_C} + ) + + set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN") + target_link_libraries(celix_pubsub_admin_zmq PRIVATE + Celix::pubsub_spi + Celix::framework Celix::dfi Celix::log_helper + ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} + ) + target_include_directories(celix_pubsub_admin_zmq PRIVATE + ${ZMQ_INCLUDE_DIR} + ${CZMQ_INCLUDE_DIR} + ${JANSSON_INCLUDE_DIR} + src + ) + + install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub) + target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api) + add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq) +endif() http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c new file mode 100644 index 0000000..008dff5 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c @@ -0,0 +1,186 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * psa_activator.c + * + * \date Sep 30, 2011 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include <stdlib.h> + +#include "bundle_activator.h" +#include "service_tracker.h" + +#include "pubsub_admin_impl.h" + + +struct activator { + pubsub_admin_pt admin; + pubsub_admin_service_pt adminService; + service_registration_pt registration; + service_tracker_pt serializerTracker; +}; + + +static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { + struct activator *act= (struct activator*)handle; + if (act->admin->externalPublications && !hashMap_isEmpty(act->admin->externalPublications)) { + fprintf(outStream, "External Publications:\n"); + for(hash_map_iterator_t iter = hashMapIterator_construct(act->admin->externalPublications); hashMapIterator_hasNext(&iter);) { + const char* key = (const char*)hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } + } + if (act->admin->localPublications && !hashMap_isEmpty(act->admin->localPublications)) { + fprintf(outStream, "Local Publications:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->localPublications); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } + } + if (act->admin->subscriptions && !hashMap_isEmpty(act->admin->subscriptions)) { + fprintf(outStream, "Active Subscriptions:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->subscriptions); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } + } + if (act->admin->pendingSubscriptions && !hashMap_isEmpty(act->admin->pendingSubscriptions)) { + fprintf(outStream, "Pending Subscriptions:\n"); + for (hash_map_iterator_t iter = hashMapIterator_construct( + act->admin->pendingSubscriptions); hashMapIterator_hasNext(&iter);) { + const char *key = (const char *) hashMapIterator_nextKey(&iter); + fprintf(outStream, " %s\n", key); + } + } + return CELIX_SUCCESS; +} + +celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator; + + activator = calloc(1, sizeof(*activator)); + if (!activator) { + status = CELIX_ENOMEM; + } + else{ + *userData = activator; + + status = pubsubAdmin_create(context, &(activator->admin)); + + if(status == CELIX_SUCCESS){ + service_tracker_customizer_pt customizer = NULL; + status = serviceTrackerCustomizer_create(activator->admin, + NULL, + pubsubAdmin_serializerAdded, + NULL, + pubsubAdmin_serializerRemoved, + &customizer); + if(status == CELIX_SUCCESS){ + status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); + if (status == CELIX_SUCCESS) { + properties_pt shellProps = properties_create(); + properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_zmq_info"); + properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_zmq_info"); + properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "psa_zmq_info: Overview of PubSub ZMQ Admin"); + activator->admin->shellCmdService.handle = activator; + activator->admin->shellCmdService.executeCommand = shellCommand; + bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &activator->admin->shellCmdService, shellProps, &activator->admin->shellCmdReg); + } else { + serviceTrackerCustomizer_destroy(customizer); + pubsubAdmin_destroy(activator->admin); + } + } + else{ + pubsubAdmin_destroy(activator->admin); + } + } + } + return status; +} + +celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; + pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc)); + + if (!pubsubAdminSvc) { + status = CELIX_ENOMEM; + } + else{ + pubsubAdminSvc->admin = activator->admin; + + pubsubAdminSvc->addPublication = pubsubAdmin_addPublication; + pubsubAdminSvc->removePublication = pubsubAdmin_removePublication; + + pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription; + pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription; + + pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; + pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; + + pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; + + activator->adminService = pubsubAdminSvc; + + status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); + + status += serviceTracker_open(activator->serializerTracker); + + } + + + return status; +} + +celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; + + status += serviceTracker_close(activator->serializerTracker); + serviceRegistration_unregister(activator->admin->shellCmdReg); + activator->admin->shellCmdReg = NULL; + status += serviceRegistration_unregister(activator->registration); + + activator->registration = NULL; + + free(activator->adminService); + activator->adminService = NULL; + + return status; +} + +celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; + + serviceTracker_destroy(activator->serializerTracker); + pubsubAdmin_destroy(activator->admin); + activator->admin = NULL; + free(activator); + + return status; +} + + http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c new file mode 100644 index 0000000..e9bb6c3 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c @@ -0,0 +1,1183 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_admin_impl.c + * + * \date Sep 30, 2011 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "pubsub_admin_impl.h" +#include <zmq.h> + +#include <stdio.h> +#include <stdlib.h> + +#include <arpa/inet.h> +#include <sys/socket.h> +#include <netdb.h> + +#ifndef ANDROID +#include <ifaddrs.h> +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include "constants.h" +#include "utils.h" +#include "hash_map.h" +#include "array_list.h" +#include "bundle_context.h" +#include "bundle.h" +#include "service_reference.h" +#include "service_registration.h" +#include "log_helper.h" +#include "log_service.h" +#include "celix_threads.h" +#include "service_factory.h" + +#include "topic_subscription.h" +#include "topic_publication.h" +#include "pubsub_endpoint.h" +#include "pubsub_utils.h" +#include "pubsub/subscriber.h" + +#define MAX_KEY_FOLDER_PATH_LENGTH 512 + +static const char *DEFAULT_IP = "127.0.0.1"; + +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip); +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut); +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); + +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { + celix_status_t status = CELIX_SUCCESS; + +#ifdef BUILD_WITH_ZMQ_SECURITY + if (!zsys_has_curve()){ + printf("PSA_ZMQ: zeromq curve unsupported\n"); + return CELIX_SERVICE_EXCEPTION; + } +#endif + + *admin = calloc(1, sizeof(**admin)); + + if (!*admin){ + return CELIX_ENOMEM; + } + + const char *ip = NULL; + char *detectedIp = NULL; + (*admin)->bundle_context= context; + (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&((*admin)->noSerializerSubscriptions)); + arrayList_create(&((*admin)->noSerializerPublications)); + arrayList_create(&((*admin)->serializerList)); + + celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); + celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->serializerListLock, NULL); + celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); + + celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr); + celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE); + celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr); + + celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr); + celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE); + celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr); + + if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { + logHelper_start((*admin)->loghelper); + } + + bundleContext_getProperty(context,PSA_IP , &ip); + +#ifndef ANDROID + if (ip == NULL) { + const char *interface = NULL; + + bundleContext_getProperty(context, PSA_ITF, &interface); + if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface); + } + + ip = detectedIp; + } +#endif + + if (ip != NULL) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip); + (*admin)->ipAddress = strdup(ip); + } + else { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP); + (*admin)->ipAddress = strdup(DEFAULT_IP); + } + + if (detectedIp != NULL) { + free(detectedIp); + } + + const char* basePortStr = NULL; + const char* maxPortStr = NULL; + char* endptrBase = NULL; + char* endptrMax = NULL; + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); + (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); + (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); + if (*endptrBase != '\0') { + (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; + } + if (*endptrMax != '\0') { + (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; + } + + // Disable Signal Handling by CZMQ + setenv("ZSYS_SIGHANDLER", "false", true); + + const char *nrZmqThreads = NULL; + bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads); + + if(nrZmqThreads != NULL) { + char *endPtr = NULL; + unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); + if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { + zsys_set_io_threads(nrThreads); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads); + printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads); + } + } + +#ifdef BUILD_WITH_ZMQ_SECURITY + // Setup authenticator + zactor_t* auth = zactor_new (zauth, NULL); + zstr_sendx(auth, "VERBOSE", NULL); + + // Load all public keys of subscribers into the application + // This step is done for authenticating subscribers + char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH]; + char* keys_bundle_dir = pubsub_getKeysBundleDir(context); + snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir); + zstr_sendx (auth, "CURVE", curve_folder_path, NULL); + free(keys_bundle_dir); + + (*admin)->zmq_auth = auth; +#endif + + + (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE; + (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE; + (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE; + + const char *defaultScoreStr = NULL; + const char *sampleScoreStr = NULL; + const char *controlScoreStr = NULL; + bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, &defaultScoreStr); + bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr); + bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, &controlScoreStr); + + if (defaultScoreStr != NULL) { + (*admin)->defaultScore = strtof(defaultScoreStr, NULL); + } + if (sampleScoreStr != NULL) { + (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL); + } + if (controlScoreStr != NULL) { + (*admin)->qosControlScore = strtof(controlScoreStr, NULL); + } + + (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE; + const char *verboseStr = NULL; + bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr); + if (verboseStr != NULL) { + (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; + } + + if ((*admin)->verbose) { + printf("PSA ZMQ Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); + } + + return status; +} + + +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) +{ + celix_status_t status = CELIX_SUCCESS; + + free(admin->ipAddress); + + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->pendingSubscriptions,false,false); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + celixThreadMutex_lock(&admin->subscriptionsLock); + hashMap_destroy(admin->subscriptions,false,false); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + celixThreadMutex_lock(&admin->localPublicationsLock); + hashMap_destroy(admin->localPublications,true,false); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + celixThreadMutex_lock(&admin->externalPublicationsLock); + iter = hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->externalPublications,false,false); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_destroy(admin->serializerList); + celixThreadMutex_unlock(&admin->serializerListLock); + + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_destroy(admin->noSerializerSubscriptions); + arrayList_destroy(admin->noSerializerPublications); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + + celixThreadMutex_lock(&admin->usedSerializersLock); + + iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); + + iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + + celixThreadMutex_destroy(&admin->usedSerializersLock); + celixThreadMutex_destroy(&admin->serializerListLock); + celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); + + celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr); + celixThreadMutex_destroy(&admin->noSerializerPendingsLock); + + celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr); + celixThreadMutex_destroy(&admin->subscriptionsLock); + + celixThreadMutex_destroy(&admin->localPublicationsLock); + celixThreadMutex_destroy(&admin->externalPublicationsLock); + + logHelper_stop(admin->loghelper); + + logHelper_destroy(&admin->loghelper); + +#ifdef BUILD_WITH_ZMQ_SECURITY + if (admin->zmq_auth != NULL){ + zactor_destroy(&(admin->zmq_auth)); + } +#endif + + free(admin); + + return status; +} + +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); + + if(any_sub==NULL){ + + int i; + pubsub_serializer_service_t *best_serializer = NULL; + const char *serType = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){ + status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, serType, &any_sub); + } + else{ + printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status == CELIX_SUCCESS){ + + /* Connect all internal publishers */ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); + while(hashMapIterator_hasNext(lp_iter)){ + service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;i<arrayList_size(topic_publishers);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); + if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + arrayList_destroy(topic_publishers); + } + } + hashMapIterator_destroy(lp_iter); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Connect also all external publishers */ + celixThreadMutex_lock(&admin->externalPublicationsLock); + hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(extp_iter)){ + array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); + if(ext_pub_list!=NULL){ + for(i=0;i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + } + } + hashMapIterator_destroy(extp_iter); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + + pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); + + status += pubsub_topicSubscriptionStart(any_sub); + + } + + if (status == CELIX_SUCCESS){ + hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); + } + + } + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; +} + +/** + * A subcriber service is registered and this PSA had won the match + * (based on qos or other meta data) + * Will update the pubsub endpoint with the chosen pubsub admin and serializer type + */ +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) { + return pubsubAdmin_addAnySubscription(admin,subEP); + } + + /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + celixThreadMutex_lock(&admin->subscriptionsLock); + celixThreadMutex_lock(&admin->localPublicationsLock); + celixThreadMutex_lock(&admin->externalPublicationsLock); + + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + + if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic + pubsubAdmin_addSubscriptionToPendingList(admin,subEP); + } + else{ + int i; + topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); + + if(subscription == NULL) { + pubsub_serializer_service_t *best_serializer = NULL; + const char *serType = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){ + status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription); + } + else{ + printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status==CELIX_SUCCESS){ + + /* Try to connect internal publishers */ + if(factory!=NULL){ + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;i<arrayList_size(topic_publishers);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); + if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + arrayList_destroy(topic_publishers); + } + + } + + /* Look also for external publishers */ + if(ext_pub_list!=NULL){ + for(i=0;i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + } + + pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + + status += pubsub_topicSubscriptionStart(subscription); + + } + + if(status==CELIX_SUCCESS){ + + hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + + connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); + } + } + + if (status == CELIX_SUCCESS){ + pubsub_topicIncreaseNrSubscribers(subscription); + } + } + + free(scope_topic); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + celixThreadMutex_unlock(&admin->localPublicationsLock); + celixThreadMutex_unlock(&admin->subscriptionsLock); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + + if (admin->verbose) { + printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", + properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY), + properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE)); + printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + return status; + +} + +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + if (admin->verbose) { + printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", + properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY), + properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE)); + printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + + celixThreadMutex_lock(&admin->subscriptionsLock); + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); + if(sub!=NULL){ + pubsub_topicDecreaseNrSubscribers(sub); + if(pubsub_topicGetNrSubscribers(sub) == 0) { + status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); + } + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + if(sub==NULL){ + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + free(scope_topic); + + + + return status; + +} + + +/** + * A bundle has shown interested in a publisher service and this PSA had won the match + * based on filter or embedded topic.properties (extender pattern) + * OR !! + * A remote publication has been discovered and forwarded to this call + * Will update the pubsub endpoint with the chosen pubsub admin and serializer type + */ +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + + const char *fwUUID = NULL; + bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if (fwUUID == NULL) { + printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + + const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID); + bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0; + + if (isOwn) { + //should be null, willl be set in this call + assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL); + assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL); + } else { + //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null + assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != NULL); + assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) != NULL); + } + + if (isOwn) { + properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE); + } + + + char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + + if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && + (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) { + + celixThreadMutex_lock(&admin->localPublicationsLock); + + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); + + if (factory == NULL) { + topic_publication_pt pub = NULL; + pubsub_serializer_service_t *best_serializer = NULL; + const char *serType = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){ + status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + if (isOwn) { + properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType); + } + } + else { + printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status == CELIX_SUCCESS) { + status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); + if (status == CELIX_SUCCESS && factory != NULL) { + hashMap_put(admin->localPublications, strdup(scope_topic), factory); + connectTopicPubSubToSerializer(admin, best_serializer, pub, true); + } + } else { + printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), + properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID)); + } + } else { + //just add the new EP to the list + topic_publication_pt pub = (topic_publication_pt) factory->handle; + pubsub_topicPublicationAddPublisherEP(pub, pubEP); + } + + celixThreadMutex_unlock(&admin->localPublicationsLock); + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); + if (ext_pub_list == NULL) { + arrayList_create(&ext_pub_list); + hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); + } + + arrayList_add(ext_pub_list, pubEP); + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Re-evaluate the pending subscriptions */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + + hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); + if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. + char* topic = (char*) hashMapEntry_getKey(pendingSub); + array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); + int i; + for (i = 0; i < arrayList_size(pendingSubList); i++) { + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); + pubsubAdmin_addSubscription(admin, subEP); + } + hashMap_remove(admin->pendingSubscriptions, scope_topic); + arrayList_clear(pendingSubList); + arrayList_destroy(pendingSubList); + free(topic); + } + + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + /* Connect the new publisher to the subscription for his topic, if there is any */ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); + if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); + if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + free(scope_topic); + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + + if (admin->verbose) { + printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY), + properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE)); + printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL), + isOwn); + } + + + return status; + +} + +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ + celix_status_t status = CELIX_SUCCESS; + int count = 0; + + if (admin->verbose) { + printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n", + properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY), + properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY), + properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE)); + printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + const char* fwUUID = NULL; + + bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + if(fwUUID==NULL){ + fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + + if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){ + + celixThreadMutex_lock(&admin->localPublicationsLock); + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + if(factory!=NULL){ + topic_publication_pt pub = (topic_publication_pt)factory->handle; + pubsub_topicPublicationRemovePublisherEP(pub,pubEP); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + if(factory==NULL){ + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + if(ext_pub_list!=NULL){ + int i; + bool found = false; + for(i=0;!found && i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + found = pubsubEndpoint_equals(pubEP,p); + if (found){ + arrayList_remove(ext_pub_list,i); + } + } + // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) + for(i=0; i<arrayList_size(ext_pub_list);i++) { + pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) { + count++; + } + } + + if(arrayList_size(ext_pub_list)==0){ + hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); + char* topic = (char*)hashMapEntry_getKey(entry); + array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); + hashMap_remove(admin->externalPublications,topic); + arrayList_destroy(list); + free(topic); + } + } + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Check if this publisher was connected to one of our subscribers*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); + if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); + if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ + celix_status_t status = CELIX_SUCCESS; + + if (admin->verbose) { + printf("PSA_ZMQ: Closing all publications\n"); + } + + celixThreadMutex_lock(&admin->localPublicationsLock); + char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic); + hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); + if(pubsvc_entry!=NULL){ + char* key = (char*)hashMapEntry_getKey(pubsvc_entry); + service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + status += pubsub_topicPublicationStop(pub); + disconnectTopicPubSubFromSerializer(admin, pub, true); + status += pubsub_topicPublicationDestroy(pub); + hashMap_remove(admin->localPublications,scope_topic); + free(key); + free(factory); + } + free(scope_topic); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ + celix_status_t status = CELIX_SUCCESS; + + if (admin->verbose) { + printf("PSA_ZMQ: Closing all subscriptions\n"); + } + + celixThreadMutex_lock(&admin->subscriptionsLock); + char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic); + hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); + if(sub_entry!=NULL){ + char* topic = (char*)hashMapEntry_getKey(sub_entry); + + topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); + status += pubsub_topicSubscriptionStop(ts); + disconnectTopicPubSubFromSerializer(admin, ts, false); + status += pubsub_topicSubscriptionDestroy(ts); + hashMap_remove(admin->subscriptions,scope_topic); + free(topic); + + } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + + +#ifndef ANDROID +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + struct ifaddrs *ifaddr, *ifa; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) != -1) + { + for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL) + continue; + + if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { + if (interface == NULL) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + else if (strcmp(ifa->ifa_name, interface) == 0) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + } + } + + freeifaddrs(ifaddr); + } + + return status; +} +#endif + +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME)); + array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); + if(pendingListPerTopic==NULL){ + arrayList_create(&pendingListPerTopic); + hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); + } + arrayList_add(pendingListPerTopic,subEP); + free(scope_topic); + return status; +} + +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ + /* Assumption: serializers are all available at startup. + * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ + + celix_status_t status = CELIX_SUCCESS; + int i=0; + + const char *serType = NULL; + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if (serType == NULL) { + fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY); + return CELIX_SERVICE_EXCEPTION; + } + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_add(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + /* Now let's re-evaluate the pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + + for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addSubscription(admin, ep); + } + } + + for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addPublication(admin, ep); + } + } + + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + if (admin->verbose) { + printf("PSA_ZMQ: %s serializer added\n", serType); + } + + return status; +} + +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + int i=0, j=0; + const char *serType = NULL; + + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if (serType == NULL) { + printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY); + return CELIX_SERVICE_EXCEPTION; + } + + celixThreadMutex_lock(&admin->serializerListLock); + /* Remove the serializer from the list */ + arrayList_removeElement(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + + celixThreadMutex_lock(&admin->usedSerializersLock); + array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); + array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); + celixThreadMutex_unlock(&admin->usedSerializersLock); + + /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ + if(topicPubList!=NULL){ + for(i=0;i<arrayList_size(topicPubList);i++){ + topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i); + /* Stop the topic publication */ + pubsub_topicPublicationStop(topicPub); + /* Get the endpoints that are going to be orphan */ + array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub); + for(j=0;j<arrayList_size(pubList);j++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j); + /* Remove the publication */ + pubsubAdmin_removePublication(admin, pubEP); + /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ + if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){ + properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL); + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + arrayList_destroy(pubList); + + /* Cleanup also the localPublications hashmap*/ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); + char *key = NULL; + service_factory_pt factory = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + factory = (service_factory_pt)hashMapEntry_getValue(entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + if(pub==topicPub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->localPublications, key); + free(factory); + free(key); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Finally destroy the topicPublication */ + pubsub_topicPublicationDestroy(topicPub); + } + arrayList_destroy(topicPubList); + } + + /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ + if(topicSubList!=NULL){ + for(i=0;i<arrayList_size(topicSubList);i++){ + topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i); + /* Stop the topic subscription */ + pubsub_topicSubscriptionStop(topicSub); + /* Get the endpoints that are going to be orphan */ + array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub); + for(j=0;j<arrayList_size(subList);j++){ + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j); + /* Remove the subscription */ + pubsubAdmin_removeSubscription(admin, subEP); + /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ + if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){ + properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL); + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + /* Cleanup also the subscriptions hashmap*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); + char *key = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); + if(sub==topicSub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->subscriptions, key); + free(key); + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + /* Finally destroy the topicSubscription */ + pubsub_topicSubscriptionDestroy(topicSub); + } + arrayList_destroy(topicSubList); + } + + + if (admin->verbose) { + printf("PSA_ZMQ: %s serializer removed\n", serType); + } + + return CELIX_SUCCESS; +} + +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ + celix_status_t status = CELIX_SUCCESS; + + const char *fwUuid = NULL; + bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid); + if (fwUuid == NULL) { + return CELIX_ILLEGAL_STATE; + } + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, score); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; +} + +/* This one recall the same logic as in the match function */ +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut) { + celix_status_t status = CELIX_SUCCESS; + + pubsub_serializer_service_t *serSvc = NULL; + service_reference_pt svcRef = NULL; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef); + celixThreadMutex_unlock(&admin->serializerListLock); + + if (svcRef != NULL) { + bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc); + bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more + if (serTypeOut != NULL) { + serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serTypeOut); + } + } + + + *svcOut = serSvc; + + return status; +} + +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + array_list_pt list = (array_list_pt)hashMap_get(map,serializer); + if(list==NULL){ + arrayList_create(&list); + hashMap_put(map,serializer,list); + } + arrayList_add(list,topicPubSub); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} + +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + hash_map_iterator_pt iter = hashMapIterator_create(map); + while(hashMapIterator_hasNext(iter)){ + array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); + if(arrayList_removeElement(list, topicPubSub)){ //Found it! + break; + } + } + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} +