http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/src/topic_subscription.c new file mode 100644 index 0000000..0e7a794 --- /dev/null +++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c @@ -0,0 +1,732 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_subscription.c + * + * \date Oct 2, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "topic_subscription.h" +#include <czmq.h> +/* The following undefs prevent the collision between: + * - sys/syslog.h (which is included within czmq) + * - celix/dfi/dfi_log_util.h + */ +#undef LOG_DEBUG +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_WARNING + +#include <string.h> +#include <stdlib.h> +#include <signal.h> + +#include "utils.h" +#include "celix_errno.h" +#include "constants.h" +#include "version.h" + +#include "subscriber.h" +#include "publisher.h" +#include "pubsub_utils.h" + +#ifdef BUILD_WITH_ZMQ_SECURITY +#include "zmq_crypto.h" + +#define MAX_CERT_PATH_LENGTH 512 +#endif + +#define POLL_TIMEOUT 250 +#define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" + +struct topic_subscription{ + + zsock_t* zmq_socket; + zcert_t * zmq_cert; + zcert_t * zmq_pub_cert; + pthread_mutex_t socket_lock; + service_tracker_pt tracker; + array_list_pt sub_ep_list; + celix_thread_t recv_thread; + bool running; + celix_thread_mutex_t ts_lock; + bundle_context_pt context; + + pubsub_serializer_service_t *serializer; + + hash_map_pt servicesMap; // key = service, value = msg types map + + celix_thread_mutex_t pendingConnections_lock; + array_list_pt pendingConnections; + + array_list_pt pendingDisconnections; + celix_thread_mutex_t pendingDisconnections_lock; + + unsigned int nrSubscribers; +}; + +typedef struct complete_zmq_msg{ + zframe_t* header; + zframe_t* payload; +}* complete_zmq_msg_pt; + +typedef struct mp_handle{ + hash_map_pt svc_msg_db; + hash_map_pt rcv_msg_map; +}* mp_handle_pt; + +typedef struct msg_map_entry{ + bool retain; + void* msgInst; +}* msg_map_entry_pt; + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); +static void* zmq_recv_thread_func(void* arg); +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); +static void sigusr1_sighandler(int signo); +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); +static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); +static void destroy_mp_handle(mp_handle_pt mp_handle); +static void connectPendingPublishers(topic_subscription_pt sub); +static void disconnectPendingPublishers(topic_subscription_pt sub); + +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ + celix_status_t status = CELIX_SUCCESS; + +#ifdef BUILD_WITH_ZMQ_SECURITY + char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); + if (keys_bundle_dir == NULL){ + return CELIX_SERVICE_EXCEPTION; + } + + const char* keys_file_path = NULL; + const char* keys_file_name = NULL; + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); + + char sub_cert_path[MAX_CERT_PATH_LENGTH]; + char pub_cert_path[MAX_CERT_PATH_LENGTH]; + + //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" + snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); + snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); + free(keys_bundle_dir); + + printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path); + printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path); + + zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); + if (sub_cert == NULL){ + printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path); + return CELIX_SERVICE_EXCEPTION; + } + + zcert_t* pub_cert = zcert_load(pub_cert_path); + if (pub_cert == NULL){ + zcert_destroy(&sub_cert); + printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path); + return CELIX_SERVICE_EXCEPTION; + } + + const char* pub_key = zcert_public_txt(pub_cert); +#endif + + zsock_t* zmq_s = zsock_new (ZMQ_SUB); + if(zmq_s==NULL){ +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_destroy(&sub_cert); + zcert_destroy(&pub_cert); +#endif + + return CELIX_SERVICE_EXCEPTION; + } + +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_apply (sub_cert, zmq_s); + zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber +#endif + + if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){ + zsock_set_subscribe (zmq_s, ""); + } + else{ + zsock_set_subscribe (zmq_s, topic); + } + + topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); + ts->context = bundle_context; + ts->zmq_socket = zmq_s; + ts->running = false; + ts->nrSubscribers = 0; + ts->serializer = best_serializer; + +#ifdef BUILD_WITH_ZMQ_SECURITY + ts->zmq_cert = sub_cert; + ts->zmq_pub_cert = pub_cert; +#endif + + celixThreadMutex_create(&ts->socket_lock, NULL); + celixThreadMutex_create(&ts->ts_lock,NULL); + arrayList_create(&ts->sub_ep_list); + ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + + arrayList_create(&ts->pendingConnections); + arrayList_create(&ts->pendingDisconnections); + celixThreadMutex_create(&ts->pendingConnections_lock, NULL); + celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); + + char filter[128]; + memset(filter,0,128); + if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { + // default scope, means that subscriber has not defined a scope property + snprintf(filter, 128, "(&(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic); + + } else { + snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic, + PUBSUB_SUBSCRIBER_SCOPE,scope); + } + service_tracker_customizer_pt customizer = NULL; + status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); + status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); + + struct sigaction actions; + memset(&actions, 0, sizeof(actions)); + sigemptyset(&actions.sa_mask); + actions.sa_flags = 0; + actions.sa_handler = sigusr1_sighandler; + + sigaction(SIGUSR1,&actions,NULL); + + *out=ts; + + return status; +} + +celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + ts->running = false; + serviceTracker_destroy(ts->tracker); + arrayList_clear(ts->sub_ep_list); + arrayList_destroy(ts->sub_ep_list); + /* TODO: Destroy all the serializer maps? */ + hashMap_destroy(ts->servicesMap,false,false); + + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_destroy(ts->pendingConnections); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + celixThreadMutex_destroy(&ts->pendingConnections_lock); + + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_destroy(ts->pendingDisconnections); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + celixThreadMutex_destroy(&ts->pendingDisconnections_lock); + + celixThreadMutex_unlock(&ts->ts_lock); + + celixThreadMutex_lock(&ts->socket_lock); + zsock_destroy(&(ts->zmq_socket)); +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_destroy(&(ts->zmq_cert)); + zcert_destroy(&(ts->zmq_pub_cert)); +#endif + celixThreadMutex_unlock(&ts->socket_lock); + celixThreadMutex_destroy(&ts->socket_lock); + + celixThreadMutex_destroy(&ts->ts_lock); + + free(ts); + + return status; +} + +celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + status = serviceTracker_open(ts->tracker); + + ts->running = true; + + if(status==CELIX_SUCCESS){ + status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts); + } + + return status; +} + +celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + ts->running = false; + + pthread_kill(ts->recv_thread.thread,SIGUSR1); + + celixThread_join(ts->recv_thread,NULL); + + status = serviceTracker_close(ts->tracker); + + return status; +} + +celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&ts->socket_lock); + if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){ + status = CELIX_SERVICE_EXCEPTION; + } + celixThreadMutex_unlock(&ts->socket_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_add(ts->pendingConnections, url); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_add(ts->pendingDisconnections, url); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->socket_lock); + if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){ + status = CELIX_SERVICE_EXCEPTION; + } + celixThreadMutex_unlock(&ts->socket_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + arrayList_add(ts->sub_ep_list,subEP); + celixThreadMutex_unlock(&ts->ts_lock); + + return status; + +} + +celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + ts->nrSubscribers++; + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + arrayList_removeElement(ts->sub_ep_list,subEP); + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + ts->nrSubscribers--; + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { + return ts->nrSubscribers; +} + +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ + return sub->sub_ep_list; +} + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (!hashMap_containsKey(ts->servicesMap, service)) { + bundle_pt bundle = NULL; + hash_map_pt msgTypes = NULL; + + serviceReference_getBundle(reference, &bundle); + + if(ts->serializer != NULL && bundle!=NULL){ + ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); + if(msgTypes != NULL){ + hashMap_put(ts->servicesMap, service, msgTypes); + printf("PSA_ZMQ_TS: New subscriber registered.\n"); + } + } + else{ + printf("PSA_ZMQ_TS: Cannot register new subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); + if(msgTypes!=NULL && ts->serializer!=NULL){ + ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); + printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); + } + else{ + printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + + +static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ + + pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); + + hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); + hash_map_pt msgTypes = hashMapEntry_getValue(entry); + + pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type); + if (msgSer == NULL) { + printf("PSA_ZMQ_TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); + } + else{ + void *msgInst = NULL; + bool validVersion = checkVersion(msgSer->msgVersion,first_msg_hdr); + + if(validVersion){ + + celix_status_t status = msgSer->deserialize(msgSer, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst); + + if (status == CELIX_SUCCESS) { + bool release = true; + mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); + pubsub_multipart_callbacks_t mp_callbacks; + mp_callbacks.handle = mp_handle; + mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; + mp_callbacks.getMultipart = pubsub_getMultipart; + subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release); + + if(release){ + msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst); + } + if(mp_handle!=NULL){ + destroy_mp_handle(mp_handle); + } + } + else{ + printf("PSA_ZMQ_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); + } + + } + else{ + int major=0,minor=0; + version_getMajor(msgSer->msgVersion,&major); + version_getMinor(msgSer->msgVersion,&minor); + printf("PSA_ZMQ_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor); + } + + } + } + hashMapIterator_destroy(iter); + + int i = 0; + for(;i<arrayList_size(msg_list);i++){ + complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i); + zframe_destroy(&(c_msg->header)); + zframe_destroy(&(c_msg->payload)); + free(c_msg); + } + + arrayList_destroy(msg_list); + +} + +static void* zmq_recv_thread_func(void * arg) { + topic_subscription_pt sub = (topic_subscription_pt) arg; + + while (sub->running) { + + celixThreadMutex_lock(&sub->socket_lock); + + zframe_t* headerMsg = zframe_recv(sub->zmq_socket); + if (headerMsg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n"); + } else { + perror("PSA_ZMQ_TS: header_recv thread"); + } + } + else { + + pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg); + + if (zframe_more(headerMsg)) { + + zframe_t* payloadMsg = zframe_recv(sub->zmq_socket); + if (payloadMsg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("PSA_ZMQ_TS: payload_recv thread for topic got a signal and will exit.\n"); + } else { + perror("PSA_ZMQ_TS: payload_recv"); + } + zframe_destroy(&headerMsg); + } else { + + //Let's fetch all the messages from the socket + array_list_pt msg_list = NULL; + arrayList_create(&msg_list); + complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg)); + firstMsg->header = headerMsg; + firstMsg->payload = payloadMsg; + arrayList_add(msg_list, firstMsg); + + bool more = zframe_more(payloadMsg); + while (more) { + + zframe_t* h_msg = zframe_recv(sub->zmq_socket); + if (h_msg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n"); + } else { + perror("PSA_ZMQ_TS: h_recv"); + } + break; + } + + zframe_t* p_msg = zframe_recv(sub->zmq_socket); + if (p_msg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n"); + } else { + perror("PSA_ZMQ_TS: p_recv"); + } + zframe_destroy(&h_msg); + break; + } + + complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg)); + c_msg->header = h_msg; + c_msg->payload = p_msg; + arrayList_add(msg_list, c_msg); + + if (!zframe_more(p_msg)) { + more = false; + } + } + + celixThreadMutex_lock(&sub->ts_lock); + process_msg(sub, msg_list); + celixThreadMutex_unlock(&sub->ts_lock); + + } + + } //zframe_more(headerMsg) + else { + free(headerMsg); + printf("PSA_ZMQ_TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); + } + + } // headerMsg != NULL + celixThreadMutex_unlock(&sub->socket_lock); + connectPendingPublishers(sub); + disconnectPendingPublishers(sub); + } // while + + return NULL; +} + +static void connectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingConnections_lock); + while(!arrayList_isEmpty(sub->pendingConnections)) { + char * pubEP = arrayList_remove(sub->pendingConnections, 0); + pubsub_topicSubscriptionConnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingConnections_lock); +} + +static void disconnectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingDisconnections_lock); + while(!arrayList_isEmpty(sub->pendingDisconnections)) { + char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); + pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingDisconnections_lock); +} + +static void sigusr1_sighandler(int signo){ + printf("PSA_ZMQ_TS: Topic subscription being shut down...\n"); + return; +} + +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ + bool check=false; + int major=0,minor=0; + + if(msgVersion!=NULL){ + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ + check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + } + } + + return check; +} + +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; +} + +static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){ + + if(handle==NULL){ + *part = NULL; + return -1; + } + + mp_handle_pt mp_handle = (mp_handle_pt)handle; + msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map, (void*)(uintptr_t) msgTypeId); + if(entry!=NULL){ + entry->retain = retain; + *part = entry->msgInst; + } + else{ + printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId); + *part=NULL; + return -2; + } + + return 0; + +} + +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ + + if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message + return NULL; + } + + mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); + mp_handle->svc_msg_db = svc_msg_db; + mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL); + + int i=1; //We skip the first message, it will be handle differently + for(;i<arrayList_size(rcv_msg_list);i++){ + complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i); + pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header); + + pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, (void*)(uintptr_t)(header->type)); + + if (msgSer!= NULL) { + void *msgInst = NULL; + + bool validVersion = checkVersion(msgSer->msgVersion,header); + + if(validVersion){ + celix_status_t status = msgSer->deserialize(msgSer, (const void*)zframe_data(c_msg->payload), 0, &msgInst); + + if(status == CELIX_SUCCESS){ + msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); + entry->msgInst = msgInst; + hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)header->type,entry); + } + } + } + } + + return mp_handle; + +} + +static void destroy_mp_handle(mp_handle_pt mp_handle){ + + hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry); + msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); + pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId); + + if(msgSer!=NULL){ + if(!msgEntry->retain){ + msgSer->freeMsg(msgSer->handle,msgEntry->msgInst); + } + } + else{ + printf("PSA_ZMQ_TS: ERROR: Cannot find messageSerializer for msg %u, so cannot destroy it!\n",msgId); + } + + free(msgEntry); + } + hashMapIterator_destroy(iter); + + hashMap_destroy(mp_handle->rcv_msg_map,false,false); + free(mp_handle); +}
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.h b/pubsub/pubsub_admin_zmq/src/topic_subscription.h new file mode 100644 index 0000000..7267103 --- /dev/null +++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.h @@ -0,0 +1,60 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_subscription.h + * + * \date Sep 22, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef TOPIC_SUBSCRIPTION_H_ +#define TOPIC_SUBSCRIPTION_H_ + +#include "celix_threads.h" +#include "array_list.h" +#include "celixbool.h" +#include "service_tracker.h" + +#include "pubsub_endpoint.h" +#include "pubsub_common.h" +#include "pubsub_serializer.h" + +typedef struct topic_subscription* topic_subscription_pt; + +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); +celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); +celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); + +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); + +celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); +celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); + +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); + +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); +celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); +celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); +unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription); + +#endif /*TOPIC_SUBSCRIPTION_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/zmq_crypto.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/src/zmq_crypto.c b/pubsub/pubsub_admin_zmq/src/zmq_crypto.c new file mode 100644 index 0000000..fe444bd --- /dev/null +++ b/pubsub/pubsub_admin_zmq/src/zmq_crypto.c @@ -0,0 +1,281 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * zmq_crypto.c + * + * \date Dec 2, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "zmq_crypto.h" + +#include <zmq.h> +#include <openssl/conf.h> +#include <openssl/evp.h> +#include <openssl/err.h> + +#include <string.h> + +#define MAX_FILE_PATH_LENGTH 512 +#define ZMQ_KEY_LENGTH 40 +#define AES_KEY_LENGTH 32 +#define AES_IV_LENGTH 16 + +#define KEY_TO_GET "aes_key" +#define IV_TO_GET "aes_iv" + +static char* read_file_content(char* filePath, char* fileName); +static void parse_key_lines(char *keysBuffer, char **key, char **iv); +static void parse_key_line(char *line, char **key, char **iv); +static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey); + +/** + * Return a valid zcert_t from an encoded file + * Caller is responsible for freeing by calling zcert_destroy(zcert** cert); + */ +zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path) +{ + + if (keysFilePath == NULL){ + keysFilePath = DEFAULT_KEYS_FILE_PATH; + } + + if (keysFileName == NULL){ + keysFileName = DEFAULT_KEYS_FILE_NAME; + } + + char* keys_data = read_file_content(keysFilePath, keysFileName); + if (keys_data == NULL){ + return NULL; + } + + char *key = NULL; + char *iv = NULL; + parse_key_lines(keys_data, &key, &iv); + free(keys_data); + + if (key == NULL || iv == NULL){ + free(key); + free(iv); + + printf("CRYPTO: Loading AES key and/or AES iv failed!\n"); + return NULL; + } + + //At this point, we know an aes key and iv are stored and loaded + + // generate sha256 hashes + unsigned char key_digest[EVP_MAX_MD_SIZE]; + unsigned char iv_digest[EVP_MAX_MD_SIZE]; + generate_sha256_hash((char*) key, key_digest); + generate_sha256_hash((char*) iv, iv_digest); + + zchunk_t* encoded_secret = zchunk_slurp (file_path, 0); + if (encoded_secret == NULL){ + free(key); + free(iv); + + return NULL; + } + + int encoded_secret_size = (int) zchunk_size (encoded_secret); + char* encoded_secret_data = zchunk_strdup(encoded_secret); + zchunk_destroy (&encoded_secret); + + // Decryption of data + int decryptedtext_len; + unsigned char decryptedtext[encoded_secret_size]; + decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext); + decryptedtext[decryptedtext_len] = '\0'; + + EVP_cleanup(); + + free(encoded_secret_data); + free(key); + free(iv); + + // The public and private keys are retrieved + char* public_text = NULL; + char* secret_text = NULL; + + extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text); + + byte public_key [32] = { 0 }; + byte secret_key [32] = { 0 }; + + zmq_z85_decode (public_key, public_text); + zmq_z85_decode (secret_key, secret_text); + + zcert_t* cert_loaded = zcert_new_from(public_key, secret_key); + + free(public_text); + free(secret_text); + + return cert_loaded; +} + +int generate_sha256_hash(char* text, unsigned char* digest) +{ + unsigned int digest_len; + + EVP_MD_CTX * mdctx = EVP_MD_CTX_new(); + EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL); + EVP_DigestUpdate(mdctx, text, strlen(text)); + EVP_DigestFinal_ex(mdctx, digest, &digest_len); + EVP_MD_CTX_free(mdctx); + + return digest_len; +} + +int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext) +{ + int len; + int plaintext_len; + + EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); + + EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv); + EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len); + plaintext_len = len; + EVP_DecryptFinal_ex(ctx, plaintext + len, &len); + plaintext_len += len; + + EVP_CIPHER_CTX_free(ctx); + + return plaintext_len; +} + +/** + * Caller is responsible for freeing the returned value + */ +static char* read_file_content(char* filePath, char* fileName){ + + char fileNameWithPath[MAX_FILE_PATH_LENGTH]; + snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName); + int rc = 0; + + if (!zsys_file_exists(fileNameWithPath)){ + printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath); + return NULL; + } + + zfile_t* keys_file = zfile_new (filePath, fileName); + rc = zfile_input (keys_file); + if (rc != 0){ + zfile_destroy(&keys_file); + printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath); + return NULL; + } + + ssize_t keys_file_size = zsys_file_size (fileNameWithPath); + zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0); + if (keys_chunk == NULL){ + zfile_close(keys_file); + zfile_destroy(&keys_file); + printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath); + return NULL; + } + + char* keys_data = zchunk_strdup(keys_chunk); + zchunk_destroy(&keys_chunk); + zfile_close(keys_file); + zfile_destroy (&keys_file); + + return keys_data; +} + +static void parse_key_lines(char *keysBuffer, char **key, char **iv){ + char *line = NULL, *saveLinePointer = NULL; + + bool firstTime = true; + do { + if (firstTime){ + line = strtok_r(keysBuffer, "\n", &saveLinePointer); + firstTime = false; + }else { + line = strtok_r(NULL, "\n", &saveLinePointer); + } + + if (line == NULL){ + break; + } + + parse_key_line(line, key, iv); + + } while((*key == NULL || *iv == NULL) && line != NULL); + +} + +static void parse_key_line(char *line, char **key, char **iv){ + char *detectedKey = NULL, *detectedValue= NULL; + + char* sep_at = strchr(line, ':'); + if (sep_at == NULL){ + return; + } + + *sep_at = '\0'; // overwrite first separator, creating two strings. + detectedKey = line; + detectedValue = sep_at + 1; + + if (detectedKey == NULL || detectedValue == NULL){ + return; + } + if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){ + return; + } + + if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){ + *key = strndup(detectedValue, AES_KEY_LENGTH); + } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){ + *iv = strndup(detectedValue, AES_IV_LENGTH); + } +} + +static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) { + // Load decrypted text buffer + zchunk_t* secret_decrypted = zchunk_new(input, inputlen); + if (secret_decrypted == NULL){ + printf("CRYPTO: Failed to create zchunk\n"); + return; + } + + zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted); + zchunk_destroy (&secret_decrypted); + if (secret_config == NULL){ + printf("CRYPTO: Failed to create zconfig\n"); + return; + } + + // Extract public and secret key from text buffer + char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL); + char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL); + + if (public_text == NULL || secret_text == NULL){ + zconfig_destroy(&secret_config); + printf("CRYPTO: Loading public / secret key from text-buffer failed!\n"); + return; + } + + *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1); + *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1); + + zconfig_destroy(&secret_config); +} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/zmq_crypto.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/src/zmq_crypto.h b/pubsub/pubsub_admin_zmq/src/zmq_crypto.h new file mode 100644 index 0000000..f1a990f --- /dev/null +++ b/pubsub/pubsub_admin_zmq/src/zmq_crypto.h @@ -0,0 +1,41 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * zmq_crypto.h + * + * \date Dec 2, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef ZMQ_CRYPTO_H_ +#define ZMQ_CRYPTO_H_ + +#include <czmq.h> + +#define PROPERTY_KEYS_FILE_PATH "keys.file.path" +#define PROPERTY_KEYS_FILE_NAME "keys.file.name" +#define DEFAULT_KEYS_FILE_PATH "/etc/" +#define DEFAULT_KEYS_FILE_NAME "pubsub.keys" + +zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path); +int generate_sha256_hash(char* text, unsigned char* digest); +int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext); + +#endif http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_api/CMakeLists.txt b/pubsub/pubsub_api/CMakeLists.txt new file mode 100644 index 0000000..5ceb291 --- /dev/null +++ b/pubsub/pubsub_api/CMakeLists.txt @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#api target +add_library(pubsub_api INTERFACE) + +target_include_directories(framework INTERFACE + $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include> + $<INSTALL_INTERFACE:include/celix/pubsub> +) + +#install api +install(TARGETS pubsub_api EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub) +install(DIRECTORY include/ DESTINATION include/celix/pubsub COMPONENT pubsub) + +#Setup target aliases to match external usage +add_library(Celix::pubsub_api ALIAS pubsub_api) + http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/include/pubsub/publisher.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_api/include/pubsub/publisher.h b/pubsub/pubsub_api/include/pubsub/publisher.h new file mode 100644 index 0000000..3eec149 --- /dev/null +++ b/pubsub/pubsub_api/include/pubsub/publisher.h @@ -0,0 +1,88 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * publisher.h + * + * \date Jan 7, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef __PUBSUB_PUBLISHER_H_ +#define __PUBSUB_PUBLISHER_H_ + +#include <stdlib.h> + +#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher" +#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0" + +//properties +#define PUBSUB_PUBLISHER_TOPIC "pubsub.topic" +#define PUBSUB_PUBLISHER_SCOPE "pubsub.scope" +#define PUBSUB_PUBLISHER_STRATEGY "pubsub.strategy" +#define PUBSUB_PUBLISHER_CONFIG "pubsub.config" + +#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default" +//flags +#define PUBSUB_PUBLISHER_FIRST_MSG 01 +#define PUBSUB_PUBLISHER_PART_MSG 02 +#define PUBSUB_PUBLISHER_LAST_MSG 04 + +struct pubsub_release_callback_struct { + void *handle; + void (*release)(char *buf, void *handle); +}; +typedef struct pubsub_release_callback_struct pubsub_release_callback_t; +typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt; + + +struct pubsub_publisher { + void *handle; + + /** + * Every msg is identifiable by msg type string. Because masg type string are performance wise not preferable (string compares), + * a "local" (int / platform dependent) unique id will be generated runtime + * with use of a distributed key/value store or communication between participation parties. + * this is called the local message type id. This local message type id can be requested with the localMsgIdForMsgType method. + * When return is successful the msgTypeId is always greater than 0. (Note this can be used to specify/detect uninitialized msg type ids in the consumer code). + * + * Returns 0 on success. + */ + int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgTypeId); + + /** + * send is a async function, but the msg can be safely deleted after send returns. + * Returns 0 on success. + */ + int (*send)(void *handle, unsigned int msgTypeId, const void *msg); + + + /** + * sendMultipart is a async function, but the msg can be safely deleted after send returns. + * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG + * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG + * Returns 0 on success. + */ + int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags); + +}; +typedef struct pubsub_publisher pubsub_publisher_t; +typedef struct pubsub_publisher* pubsub_publisher_pt; + +#endif // __PUBSUB_PUBLISHER_H_ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/include/pubsub/subscriber.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_api/include/pubsub/subscriber.h b/pubsub/pubsub_api/include/pubsub/subscriber.h new file mode 100644 index 0000000..5d87b8a --- /dev/null +++ b/pubsub/pubsub_api/include/pubsub/subscriber.h @@ -0,0 +1,75 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * subscriber.h + * + * \date Jan 7, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef __PUBSUB_SUBSCRIBER_H_ +#define __PUBSUB_SUBSCRIBER_H_ + +#include <stdbool.h> + +#define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber" +#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0" + +//properties +#define PUBSUB_SUBSCRIBER_TOPIC "pubsub.topic" +#define PUBSUB_SUBSCRIBER_SCOPE "pubsub.scope" +#define PUBSUB_SUBSCRIBER_STRATEGY "pubsub.strategy" +#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config" + +#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default" + +struct pubsub_multipart_callbacks_struct { + void *handle; + int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId); + int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part); +}; +typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t; +typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt; + +struct pubsub_subscriber_struct { + void *handle; + + /** + * When a new message for a topic is available the receive will be called. + * + * msgType contains fully qualified name of the type and msgTypeId is a local id which presents the type for performance reasons. + * Release can be used to instruct the pubsubadmin to release (free) the message when receive function returns. Set it to false to take + * over ownership of the msg (e.g. take the responsibility to free it). + * + * The callbacks argument is only valid inside the receive function, use the getMultipart callback, with retain=true, to keep multipart messages in memory. + * results of the localMsgTypeIdForMsgType callback are valid during the complete lifecycle of the component, not just a single receive call. + * + * Return 0 implies a successful handling. If return is not 0, the msg will always be released by the pubsubadmin. + * + * this method can be NULL. + */ + int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release); + +}; +typedef struct pubsub_subscriber_struct pubsub_subscriber_t; +typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt; + + +#endif // __PUBSUB_SUBSCRIBER_H_ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h b/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h deleted file mode 100644 index bd39fc0..0000000 --- a/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h +++ /dev/null @@ -1,36 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_ -#define PUBLISHER_ENDPOINT_ANNOUNCE_H_ - -#include "pubsub_endpoint.h" - -struct publisher_endpoint_announce { - void *handle; - celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP); - celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP); - celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic); - celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic); -}; - -typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt; - - -#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h deleted file mode 100644 index f24d825..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_admin.h +++ /dev/null @@ -1,72 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin.h - * - * \date Sep 30, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_ADMIN_H_ -#define PUBSUB_ADMIN_H_ - -#include "service_reference.h" - -#include "pubsub_common.h" -#include "pubsub_endpoint.h" - -#define PSA_IP "PSA_IP" -#define PSA_ITF "PSA_INTERFACE" -#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" - -#define PUBSUB_ADMIN_TYPE_KEY "pubsub_admin.type" - -typedef struct pubsub_admin *pubsub_admin_pt; - -struct pubsub_admin_service { - pubsub_admin_pt admin; - - celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - - celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - - celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); - celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); - - /* Match principle: - * - A full matching pubsub_admin gives 200 points - * - A full matching serializer gives 100 points - * - If QoS = sample - * - fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75. - * - fallback serializers order of selection is: json, void. Points allocation is 30,20. - * - If QoS = control - * - fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75. - * - fallback serializers order of selection is: json, void. Points allocation is 30,20. - * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two. - * - */ - celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); -}; - -typedef struct pubsub_admin_service *pubsub_admin_service_pt; - -#endif /* PUBSUB_ADMIN_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_admin_match.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_admin_match.h b/pubsub/pubsub_common/public/include/pubsub_admin_match.h deleted file mode 100644 index e95ca7d..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_admin_match.h +++ /dev/null @@ -1,40 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - - -#ifndef PUBSUB_ADMIN_MATCH_H_ -#define PUBSUB_ADMIN_MATCH_H_ - -#include "celix_errno.h" -#include "properties.h" -#include "array_list.h" - -#include "pubsub_serializer.h" - -#define QOS_ATTRIBUTE_KEY "attribute.qos" -#define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ -#define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ - -#define PUBSUB_ADMIN_FULL_MATCH_SCORE 200.0F -#define SERIALIZER_FULL_MATCH_SCORE 100.0F - -celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score); -celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc); - -#endif /* PUBSUB_ADMIN_MATCH_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h b/pubsub/pubsub_common/public/include/pubsub_common.h deleted file mode 100644 index 5dfd8fd..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_common.h +++ /dev/null @@ -1,52 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_common.h - * - * \date Sep 17, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_COMMON_H_ -#define PUBSUB_COMMON_H_ - -#define PUBSUB_SERIALIZER_SERVICE "pubsub_serializer" -#define PUBSUB_ADMIN_SERVICE "pubsub_admin" -#define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery" -#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher" - -#define PUBSUB_ANY_SUB_TOPIC "any" - -#define PUBSUB_BUNDLE_ID "bundle.id" - -#define MAX_SCOPE_LEN 1024 -#define MAX_TOPIC_LEN 1024 - -struct pubsub_msg_header{ - char topic[MAX_TOPIC_LEN]; - unsigned int type; - unsigned char major; - unsigned char minor; -}; - -typedef struct pubsub_msg_header* pubsub_msg_header_pt; - - -#endif /* PUBSUB_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h b/pubsub/pubsub_common/public/include/pubsub_endpoint.h deleted file mode 100644 index 8a979eb..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h +++ /dev/null @@ -1,58 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_endpoint.h - * - * \date Sep 21, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_ENDPOINT_H_ -#define PUBSUB_ENDPOINT_H_ - -#include "service_reference.h" -#include "listener_hook_service.h" -#include "properties.h" - -#include "publisher.h" -#include "subscriber.h" - -struct pubsub_endpoint { - char *frameworkUUID; - char *scope; - char *topic; - long serviceID; - char* endpoint; - bool is_secure; - properties_pt topic_props; -}; - -typedef struct pubsub_endpoint *pubsub_endpoint_pt; - -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp); -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher); -celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher); -celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); -celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); -bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); - -char *createScopeTopicKey(const char* scope, const char* topic); - -#endif /* PUBSUB_ENDPOINT_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h deleted file mode 100644 index 4489fa4..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_serializer.h +++ /dev/null @@ -1,66 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_serializer.h - * - * \date Mar 24, 2017 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_SERIALIZER_SERVICE_H_ -#define PUBSUB_SERIALIZER_SERVICE_H_ - -#include "service_reference.h" -#include "hash_map.h" - -#include "pubsub_common.h" - -#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub_serializer.type" - -/** - * There should be a pubsub_serializer_t - * per msg type (msg id) per bundle - * - * The pubsub_serializer_service can create - * a serializer_map per bundle. Potentially using - * the extender pattern. - */ - -typedef struct pubsub_msg_serializer { - void* handle; - unsigned int msgId; - const char* msgName; - version_pt msgVersion; - - celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen); - celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed - void (*freeMsg)(void* handle, void* msg); - -} pubsub_msg_serializer_t; - -typedef struct pubsub_serializer_service { - void* handle; - - celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap); - celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap); - -} pubsub_serializer_service_t; - -#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor b/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor deleted file mode 100644 index c01a2fd..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor +++ /dev/null @@ -1,10 +0,0 @@ -:header -type=interface -name=pubsub_topic_info -version=1.0.0 -:annotations -:types -:methods -getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N -getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N -getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_utils.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_utils.h b/pubsub/pubsub_common/public/include/pubsub_utils.h deleted file mode 100644 index aff5c72..0000000 --- a/pubsub/pubsub_common/public/include/pubsub_utils.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_utils.h - * - * \date Sep 24, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_UTILS_H_ -#define PUBSUB_UTILS_H_ - -#include "bundle_context.h" -#include "array_list.h" - -char* pubsub_getScopeFromFilter(char* bundle_filter); -char* pubsub_getTopicFromFilter(char* bundle_filter); -char* pubsub_getKeysBundleDir(bundle_context_pt ctx); -array_list_pt pubsub_getTopicsFromString(char* string); - - -#endif /* PUBSUB_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_admin_match.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/pubsub_admin_match.c b/pubsub/pubsub_common/public/src/pubsub_admin_match.c deleted file mode 100644 index 2a695c1..0000000 --- a/pubsub/pubsub_common/public/src/pubsub_admin_match.c +++ /dev/null @@ -1,320 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - - -#include <string.h> -#include "service_reference.h" - -#include "pubsub_admin.h" - -#include "pubsub_admin_match.h" - -#define KNOWN_PUBSUB_ADMIN_NUM 2 -#define KNOWN_SERIALIZER_NUM 2 - -static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"}; -static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; - -static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"}; -static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; - -static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F}; -static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F}; - -static void get_serializer_type(service_reference_pt svcRef, char **serializerType); -static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService); - -celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){ - - celix_status_t status = CELIX_SUCCESS; - double final_score = 0; - int i = 0, j = 0; - - const char *requested_admin_type = NULL; - const char *requested_serializer_type = NULL; - const char *requested_qos_type = NULL; - - if(endpoint_props!=NULL){ - requested_admin_type = properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY); - requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); - requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); - } - - /* Analyze the pubsub_admin */ - if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */ - if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match - final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE; - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += qos_pubsub_admin_score[i]; - break; - } - } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += qos_pubsub_admin_score[i]; - break; - } - } - } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; - } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ - if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ - final_score += (qos_pubsub_admin_score[i]/2); - break; - } - } - } - - char *serializer_type = NULL; - /* Analyze the serializers */ - if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ - for(i=0;i<arrayList_size(serializerList);i++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ - final_score += SERIALIZER_FULL_MATCH_SCORE; - break; - } - } - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += qos_serializer_score[i]; - } - } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += qos_serializer_score[i]; - } - } - } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; - } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - ser_found = true; - } - } - } - if(ser_found){ - final_score += (qos_serializer_score[i]/2); - } - } - } - - *score = final_score; - - printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score); - - return status; -} - -celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){ - celix_status_t status = CELIX_SUCCESS; - - int i = 0, j = 0; - - const char *requested_serializer_type = NULL; - const char *requested_qos_type = NULL; - - if (endpoint_props != NULL){ - requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); - requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); - } - - service_reference_pt svcRef = NULL; - void *svc = NULL; - - /* Analyze the serializers */ - if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ - for(i=0;i<arrayList_size(serializerList);i++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,i); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - *serSvc = svc; - break; - } - } - } - } - else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ - if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE); - } - } - } - } - } - } - else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL); - } - } - } - } - } - } - else{ - printf("Unknown QoS type '%s'\n",requested_qos_type); - status = CELIX_ILLEGAL_ARGUMENT; - } - } - else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ - bool ser_found = false; - for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ - for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ - svcRef = (service_reference_pt)arrayList_get(serializerList,j); - char *serializer_type = NULL; - get_serializer_type(svcRef, &serializer_type); - if(serializer_type != NULL){ - if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ - manage_service_from_reference(svcRef, &svc,true); - if(svc==NULL){ - printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - *serSvc = svc; - ser_found = true; - printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]); - } - } - } - } - } - } - - if(svc!=NULL && svcRef!=NULL){ - manage_service_from_reference(svcRef, svc, false); - } - - return status; -} - -static void get_serializer_type(service_reference_pt svcRef, char **serializerType){ - - const char *serType = NULL; - serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType != NULL){ - *serializerType = (char*)serType; - } - else{ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef); - *serializerType = NULL; - } -} - -static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){ - bundle_context_pt context = NULL; - bundle_pt bundle = NULL; - serviceReference_getBundle(svcRef, &bundle); - bundle_getContext(bundle, &context); - if(getService){ - bundleContext_getService(context, svcRef, svc); - } - else{ - bundleContext_ungetService(context, svcRef, NULL); - } -} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/pubsub/pubsub_common/public/src/pubsub_endpoint.c deleted file mode 100644 index c3fd293..0000000 --- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c +++ /dev/null @@ -1,254 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * endpoint_description.c - * - * \date 25 Jul 2014 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <string.h> -#include <stdlib.h> - -#include "celix_errno.h" -#include "celix_log.h" - -#include "pubsub_common.h" -#include "pubsub_endpoint.h" -#include "constants.h" - -#include "pubsub_utils.h" - - -static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps); -static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher); - -static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){ - - if (fwUUID != NULL) { - psEp->frameworkUUID = strdup(fwUUID); - } - - if (scope != NULL) { - psEp->scope = strdup(scope); - } - - if (topic != NULL) { - psEp->topic = strdup(topic); - } - - psEp->serviceID = serviceId; - - if(endpoint != NULL) { - psEp->endpoint = strdup(endpoint); - } - - if(topic_props != NULL){ - if(cloneProps){ - properties_copy(topic_props, &(psEp->topic_props)); - } - else{ - psEp->topic_props = topic_props; - } - } -} - -static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){ - - properties_pt topic_props = NULL; - - bool isSystemBundle = false; - bundle_isSystemBundle(bundle, &isSystemBundle); - long bundleId = -1; - bundle_isSystemBundle(bundle, &isSystemBundle); - bundle_getBundleId(bundle,&bundleId); - - if(isSystemBundle == false) { - - char *bundleRoot = NULL; - char* topicPropertiesPath = NULL; - bundle_getEntry(bundle, ".", &bundleRoot); - - if(bundleRoot != NULL){ - - asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic); - topic_props = properties_load(topicPropertiesPath); - if(topic_props==NULL){ - printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId); - } - - free(topicPropertiesPath); - free(bundleRoot); - } - } - - return topic_props; -} - -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){ - celix_status_t status = CELIX_SUCCESS; - - *psEp = calloc(1, sizeof(**psEp)); - - pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true); - - return status; - -} - -celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){ - celix_status_t status = CELIX_SUCCESS; - - *out = calloc(1,sizeof(**out)); - - pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true); - - return status; - -} - -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){ - celix_status_t status = CELIX_SUCCESS; - - pubsub_endpoint_pt ep = calloc(1,sizeof(*ep)); - - bundle_pt bundle = NULL; - bundle_context_pt ctxt = NULL; - const char* fwUUID = NULL; - serviceReference_getBundle(reference,&bundle); - bundle_getContext(bundle,&ctxt); - bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - - const char* scope = NULL; - serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope); - - const char* topic = NULL; - serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic); - - const char* serviceId = NULL; - serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); - - /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ - properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); - - pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false); - - if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); - status = CELIX_BUNDLE_EXCEPTION; - pubsubEndpoint_destroy(ep); - *psEp = NULL; - } - else{ - *psEp = ep; - } - - return status; - -} - -celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){ - celix_status_t status = CELIX_SUCCESS; - - const char* fwUUID=NULL; - bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - - if(fwUUID==NULL){ - return CELIX_BUNDLE_EXCEPTION; - } - - char* topic = pubsub_getTopicFromFilter(info->filter); - if(topic==NULL){ - return CELIX_BUNDLE_EXCEPTION; - } - - *psEp = calloc(1, sizeof(**psEp)); - - char* scope = pubsub_getScopeFromFilter(info->filter); - if(scope == NULL) { - scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); - } - - bundle_pt bundle = NULL; - long bundleId = -1; - bundleContext_getBundle(info->context,&bundle); - - bundle_getBundleId(bundle,&bundleId); - - properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); - - /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ - pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false); - - free(topic); - free(scope); - - - return status; -} - -celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ - - if(psEp->frameworkUUID!=NULL){ - free(psEp->frameworkUUID); - psEp->frameworkUUID = NULL; - } - - if(psEp->scope!=NULL){ - free(psEp->scope); - psEp->scope = NULL; - } - - if(psEp->topic!=NULL){ - free(psEp->topic); - psEp->topic = NULL; - } - - if(psEp->endpoint!=NULL){ - free(psEp->endpoint); - psEp->endpoint = NULL; - } - - if(psEp->topic_props != NULL){ - properties_destroy(psEp->topic_props); - } - - free(psEp); - - return CELIX_SUCCESS; - -} - -bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ - - return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) && - (strcmp(psEp1->scope,psEp2->scope)==0) && - (strcmp(psEp1->topic,psEp2->topic)==0) && - (psEp1->serviceID == psEp2->serviceID) /*&& - ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ - ); -} - -char *createScopeTopicKey(const char* scope, const char* topic) { - char *result = NULL; - asprintf(&result, "%s:%s", scope, topic); - - return result; -} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_utils.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/pubsub_utils.c b/pubsub/pubsub_common/public/src/pubsub_utils.c deleted file mode 100644 index abc5ae6..0000000 --- a/pubsub/pubsub_common/public/src/pubsub_utils.c +++ /dev/null @@ -1,170 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_utils.c - * - * \date Sep 24, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <string.h> -#include <stdlib.h> - -#include "constants.h" - -#include "pubsub_common.h" -#include "publisher.h" -#include "pubsub_utils.h" - -#include "array_list.h" -#include "bundle.h" - -#include <unistd.h> -#include <sys/types.h> -#include <sys/stat.h> - -#define MAX_KEYBUNDLE_LENGTH 256 - -char* pubsub_getScopeFromFilter(char* bundle_filter){ - - char* scope = NULL; - - char* filter = strdup(bundle_filter); - - char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); - if(oc!=NULL){ - oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; - if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ - - char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE); - if(scopes!=NULL){ - - scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1; - char* bottom=strchr(scopes,')'); - *bottom='\0'; - - scope=strdup(scopes); - } else { - scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); - } - } - } - - free(filter); - - return scope; -} - -char* pubsub_getTopicFromFilter(char* bundle_filter){ - - char* topic = NULL; - - char* filter = strdup(bundle_filter); - - char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); - if(oc!=NULL){ - oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; - if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ - - char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC); - if(topics!=NULL){ - - topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1; - char* bottom=strchr(topics,')'); - *bottom='\0'; - - topic=strdup(topics); - - } - } - } - - free(filter); - - return topic; - -} - -array_list_pt pubsub_getTopicsFromString(char* string){ - - array_list_pt topic_list = NULL; - arrayList_create(&topic_list); - - char* topics = strdup(string); - - char* topic = strtok(topics,",;|# "); - arrayList_add(topic_list,strdup(topic)); - - while( (topic = strtok(NULL,",;|# ")) !=NULL){ - arrayList_add(topic_list,strdup(topic)); - } - - free(topics); - - return topic_list; - -} - -/** - * Loop through all bundles and look for the bundle with the keys inside. - * If no key bundle found, return NULL - * - * Caller is responsible for freeing the object - */ -char* pubsub_getKeysBundleDir(bundle_context_pt ctx) -{ - array_list_pt bundles = NULL; - bundleContext_getBundles(ctx, &bundles); - int nrOfBundles = arrayList_size(bundles); - long bundle_id = -1; - char* result = NULL; - - for (int i = 0; i < nrOfBundles; i++){ - bundle_pt b = arrayList_get(bundles, i); - - /* Skip bundle 0 (framework bundle) since it has no path nor revisions */ - bundle_getBundleId(b, &bundle_id); - if(bundle_id==0){ - continue; - } - - char* dir = NULL; - bundle_getEntry(b, ".", &dir); - - char cert_dir[MAX_KEYBUNDLE_LENGTH]; - snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir); - - struct stat s; - int err = stat(cert_dir, &s); - if (err != -1){ - if (S_ISDIR(s.st_mode)){ - result = dir; - break; - } - } - - free(dir); - } - - arrayList_destroy(bundles); - - return result; -} -
