Repository: celix Updated Branches: refs/heads/develop ea11a7851 -> e17228813
Fixed potential deadlocks Coverity issues Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/e1722881 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/e1722881 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/e1722881 Branch: refs/heads/develop Commit: e1722881361fcb62b92e1f5ba104ed29f88efbb1 Parents: ea11a78 Author: gricciardi <[email protected]> Authored: Thu Jun 22 11:51:29 2017 +0200 Committer: gricciardi <[email protected]> Committed: Thu Jun 22 11:51:29 2017 +0200 ---------------------------------------------------------------------- .../private/include/pubsub_admin_impl.h | 5 ++++ .../private/src/pubsub_admin_impl.c | 4 +-- .../private/src/topic_publication.c | 30 +++++++++++++++----- .../private/src/topic_subscription.c | 22 ++++++++------ 4 files changed, 43 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h index 3c36986..ccfa9e6 100644 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h +++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h @@ -74,6 +74,11 @@ struct pubsub_admin { unsigned int maxPort; }; +/* Note: correct locking order is + * 1. subscriptionsLock + * 2. publications locks + */ + celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c index 5ccee2c..953ce16 100644 --- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -323,6 +323,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint return pubsubAdmin_addAnySubscription(admin,subEP); } + celixThreadMutex_lock(&admin->subscriptionsLock); /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ celixThreadMutex_lock(&admin->localPublicationsLock); celixThreadMutex_lock(&admin->externalPublicationsLock); @@ -378,9 +379,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint } if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&admin->subscriptionsLock); hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); - celixThreadMutex_unlock(&admin->subscriptionsLock); } } @@ -392,6 +391,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint free(scope_topic); celixThreadMutex_unlock(&admin->externalPublicationsLock); celixThreadMutex_unlock(&admin->localPublicationsLock); + celixThreadMutex_unlock(&admin->subscriptionsLock); return status; http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c index 28bf56e..2eaad97 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -63,12 +63,13 @@ struct topic_publication { zsock_t* zmq_socket; + celix_thread_mutex_t socket_lock; //Protects zmq_socket access zcert_t * zmq_cert; char* endpoint; service_registration_pt svcFactoryReg; array_list_pt pub_ep_list; //List<pubsub_endpoint> hash_map_pt boundServices; //<bundle_pt,bound_service> - celix_thread_mutex_t tp_lock; + celix_thread_mutex_t tp_lock; // Protects topic_publication data structure pubsub_serializer_service_t* serializerSvc; }; @@ -79,11 +80,19 @@ typedef struct publish_bundle_bound_service { char *topic; pubsub_msg_serializer_map_t* map; unsigned short getCount; - celix_thread_mutex_t mp_lock; + celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure bool mp_send_in_progress; array_list_pt mp_parts; } publish_bundle_bound_service_t; +/* Note: correct locking order is + * 1. tp_lock + * 2. mp_lock + * 3. socket_lock + * + * tp_lock and socket_lock are independent. + */ + typedef struct pubsub_msg { pubsub_msg_header_pt header; char* payload; @@ -211,6 +220,8 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p pub->zmq_socket = socket; pub->serializerSvc = NULL; + celixThreadMutex_create(&(pub->socket_lock),NULL); + #ifdef BUILD_WITH_ZMQ_SECURITY if (pubEP->is_secure){ pub->zmq_cert = pub_cert; @@ -241,7 +252,6 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ hashMap_destroy(pub->boundServices,false,false); pub->svcFactoryReg = NULL; - zsock_destroy(&(pub->zmq_socket)); #ifdef BUILD_WITH_ZMQ_SECURITY zcert_destroy(&(pub->zmq_cert)); #endif @@ -250,6 +260,12 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ celixThreadMutex_destroy(&(pub->tp_lock)); + celixThreadMutex_lock(&(pub->socket_lock)); + zsock_destroy(&(pub->zmq_socket)); + celixThreadMutex_unlock(&(pub->socket_lock)); + + celixThreadMutex_destroy(&(pub->socket_lock)); + free(pub); return status; @@ -570,16 +586,16 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy } else{ arrayList_add(bound->mp_parts,msg); - celixThreadMutex_lock(&(bound->parent->tp_lock)); + celixThreadMutex_lock(&(bound->parent->socket_lock)); snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); bound->mp_send_in_progress = false; - celixThreadMutex_unlock(&(bound->parent->tp_lock)); + celixThreadMutex_unlock(&(bound->parent->socket_lock)); } break; case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case - celixThreadMutex_lock(&(bound->parent->tp_lock)); + celixThreadMutex_lock(&(bound->parent->socket_lock)); snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); - celixThreadMutex_unlock(&(bound->parent->tp_lock)); + celixThreadMutex_unlock(&(bound->parent->socket_lock)); break; default: printf("TP: ERROR: Invalid MP flags combination\n"); http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c index 1f42fa3..409c7a5 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -61,12 +61,12 @@ struct topic_subscription { zsock_t* zmq_socket; zcert_t * zmq_cert; zcert_t * zmq_pub_cert; - pthread_mutex_t socket_lock; + pthread_mutex_t socket_lock; //Protects zmq_socket access service_tracker_pt tracker; array_list_pt sub_ep_list; celix_thread_t recv_thread; bool running; - celix_thread_mutex_t ts_lock; + celix_thread_mutex_t ts_lock; //Protects topic_subscription data structure access bundle_context_pt context; hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t* @@ -80,6 +80,11 @@ struct topic_subscription { pubsub_serializer_service_t* serializerSvc; }; +/* Note: correct locking order is + * 1. socket_lock + * 2. ts_lock + */ + typedef struct complete_zmq_msg { zframe_t* header; zframe_t* payload; @@ -284,17 +289,18 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ celixThreadMutex_unlock(&ts->pendingDisconnections_lock); celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - celixThreadMutex_lock(&ts->socket_lock); - zsock_destroy(&(ts->zmq_socket)); #ifdef BUILD_WITH_ZMQ_SECURITY zcert_destroy(&(ts->zmq_cert)); zcert_destroy(&(ts->zmq_pub_cert)); #endif - celixThreadMutex_unlock(&ts->socket_lock); - celixThreadMutex_destroy(&ts->socket_lock); celixThreadMutex_unlock(&ts->ts_lock); + celixThreadMutex_lock(&ts->socket_lock); + zsock_destroy(&(ts->zmq_socket)); + celixThreadMutex_unlock(&ts->socket_lock); + celixThreadMutex_destroy(&ts->socket_lock); + free(ts); @@ -623,8 +629,6 @@ static void* zmq_recv_thread_func(void * arg) { zframe_destroy(&headerMsg); } else { - celixThreadMutex_lock(&sub->ts_lock); - //Let's fetch all the messages from the socket array_list_pt msg_list = NULL; arrayList_create(&msg_list); @@ -669,8 +673,8 @@ static void* zmq_recv_thread_func(void * arg) { } } + celixThreadMutex_lock(&sub->ts_lock); process_msg(sub, msg_list); - celixThreadMutex_unlock(&sub->ts_lock); }
