Repository: celix Updated Branches: refs/heads/develop cf294f3c0 -> b0477babb
Changed ZMQ blocking read to non-blocking read to prevent lock up when all publishers are removed Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b0477bab Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b0477bab Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b0477bab Branch: refs/heads/develop Commit: b0477babb58a5c12aee2385eb7c465e653a08921 Parents: cf294f3 Author: Erjan Altena <erjanalt...@gmail.com> Authored: Fri Feb 9 10:30:35 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Fri Feb 9 10:30:35 2018 +0100 ---------------------------------------------------------------------- pubsub/README.md | 9 ++++++ .../pubsub_admin_zmq/src/topic_subscription.c | 32 ++++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/b0477bab/pubsub/README.md ---------------------------------------------------------------------- diff --git a/pubsub/README.md b/pubsub/README.md index 1b533be..ff33690 100644 --- a/pubsub/README.md +++ b/pubsub/README.md @@ -82,3 +82,12 @@ For ZeroMQ without encryption, start here 1. Run `cd deploy/pubsub/pubsub_subscriber_zmq` 1. Run `cat ~/pubsub.conf >> config.properties` (only for ZeroMQ with encryption) 1. Run `sh run.sh` + +### Properties PSA ZMQ + +Some properties can be set to configure the PSA-ZMQ. If not configured defaults will be used. These +properties can be set in the config.properties file (<PROPERTY>=<VALUE> format) + +1. PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC : Set the polling interval of the ZMQ receive thread. Default 1ms +1. PSA_IP : The local IP address to be used by the ZMQ admin to publish its data. Default te first IP not on localhost +1. PSA_INTERFACE : The local ethernet interface to be used by the ZMQ admin to publish its data (ie eth0). Default the first non localhost interface \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/b0477bab/pubsub/pubsub_admin_zmq/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/src/topic_subscription.c index 9728865..8ff94d0 100644 --- a/pubsub/pubsub_admin_zmq/src/topic_subscription.c +++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c @@ -57,6 +57,11 @@ #define POLL_TIMEOUT 250 #define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" +#define PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC "PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC" +#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR "1000" +#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT 1000 + + struct topic_subscription{ zsock_t* zmq_socket; @@ -81,6 +86,7 @@ struct topic_subscription{ celix_thread_mutex_t pendingDisconnections_lock; unsigned int nrSubscribers; + int zmqReceiveTimeout; }; typedef struct complete_zmq_msg{ @@ -109,6 +115,24 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms static void destroy_mp_handle(mp_handle_pt mp_handle); static void connectPendingPublishers(topic_subscription_pt sub); static void disconnectPendingPublishers(topic_subscription_pt sub); +static unsigned int get_zmq_receive_timeout(bundle_context_pt context); + + +static unsigned int get_zmq_receive_timeout(bundle_context_pt context) { + unsigned int timeout; + const char* timeout_str = NULL; + bundleContext_getPropertyWithDefault(context, + PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC, + PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR, + &timeout_str); + timeout = strtoul(timeout_str, NULL, 10); + if (timeout == 0) { + // on errror strtol returns 0 + timeout = PSA_ZMQ_RECV_DEFAULT_TIMEOUT; + } + + return timeout; +} celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ celix_status_t status = CELIX_SUCCESS; @@ -179,7 +203,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, ts->running = false; ts->nrSubscribers = 0; ts->serializer = best_serializer; - + ts->zmqReceiveTimeout = get_zmq_receive_timeout(bundle_context); #ifdef BUILD_WITH_ZMQ_SECURITY ts->zmq_cert = sub_cert; ts->zmq_pub_cert = pub_cert; @@ -508,9 +532,11 @@ static void* zmq_recv_thread_func(void * arg) { celixThreadMutex_lock(&sub->socket_lock); - zframe_t* headerMsg = zframe_recv(sub->zmq_socket); + zframe_t* headerMsg = zframe_recv_nowait(sub->zmq_socket); if (headerMsg == NULL) { - if (errno == EINTR) { + if(errno == EAGAIN) { + usleep(sub->zmqReceiveTimeout); + } else if (errno == EINTR) { //It means we got a signal and we have to exit... printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n"); } else {