Repository: celix
Updated Branches:
  refs/heads/develop cf294f3c0 -> b0477babb


Changed ZMQ blocking read to non-blocking read to prevent lock up when all 
publishers are removed


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b0477bab
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b0477bab
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b0477bab

Branch: refs/heads/develop
Commit: b0477babb58a5c12aee2385eb7c465e653a08921
Parents: cf294f3
Author: Erjan Altena <erjanalt...@gmail.com>
Authored: Fri Feb 9 10:30:35 2018 +0100
Committer: Erjan Altena <erjanalt...@gmail.com>
Committed: Fri Feb 9 10:30:35 2018 +0100

----------------------------------------------------------------------
 pubsub/README.md                                |  9 ++++++
 .../pubsub_admin_zmq/src/topic_subscription.c   | 32 ++++++++++++++++++--
 2 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b0477bab/pubsub/README.md
----------------------------------------------------------------------
diff --git a/pubsub/README.md b/pubsub/README.md
index 1b533be..ff33690 100644
--- a/pubsub/README.md
+++ b/pubsub/README.md
@@ -82,3 +82,12 @@ For ZeroMQ without encryption, start here
 1. Run `cd deploy/pubsub/pubsub_subscriber_zmq`
 1. Run `cat ~/pubsub.conf >> config.properties` (only for ZeroMQ with 
encryption)
 1. Run `sh run.sh`
+
+### Properties PSA ZMQ
+
+Some properties can be set to configure the PSA-ZMQ. If not configured 
defaults will be used. These
+properties can be set in the config.properties file (<PROPERTY>=<VALUE> format)
+
+1. PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC : Set the polling interval of the ZMQ 
receive thread. Default 1ms
+1. PSA_IP : The local IP address to be used by the ZMQ admin to publish its 
data. Default te first IP not on localhost
+1. PSA_INTERFACE : The local ethernet interface to be used by the ZMQ admin to 
publish its data (ie eth0). Default the first non localhost interface
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b0477bab/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
index 9728865..8ff94d0 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -57,6 +57,11 @@
 #define POLL_TIMEOUT   250
 #define ZMQ_POLL_TIMEOUT_MS_ENV        "ZMQ_POLL_TIMEOUT_MS"
 
+#define PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC "PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC"
+#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR "1000"
+#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT 1000
+
+
 struct topic_subscription{
 
        zsock_t* zmq_socket;
@@ -81,6 +86,7 @@ struct topic_subscription{
        celix_thread_mutex_t pendingDisconnections_lock;
 
        unsigned int nrSubscribers;
+       int zmqReceiveTimeout;
 };
 
 typedef struct complete_zmq_msg{
@@ -109,6 +115,24 @@ static mp_handle_pt create_mp_handle(hash_map_pt 
svc_msg_db,array_list_pt rcv_ms
 static void destroy_mp_handle(mp_handle_pt mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
+static unsigned int get_zmq_receive_timeout(bundle_context_pt context);
+
+
+static unsigned int get_zmq_receive_timeout(bundle_context_pt context) {
+       unsigned int timeout;
+       const char* timeout_str = NULL;
+       bundleContext_getPropertyWithDefault(context,
+                                                                               
 PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC,
+                                                                               
 PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR,
+                                                                               
 &timeout_str);
+       timeout = strtoul(timeout_str, NULL, 10);
+       if (timeout == 0) {
+               // on errror strtol returns 0
+               timeout = PSA_ZMQ_RECV_DEFAULT_TIMEOUT;
+       }
+
+       return timeout;
+}
 
 celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out){
        celix_status_t status = CELIX_SUCCESS;
@@ -179,7 +203,7 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
        ts->running = false;
        ts->nrSubscribers = 0;
        ts->serializer = best_serializer;
-
+       ts->zmqReceiveTimeout = get_zmq_receive_timeout(bundle_context);
 #ifdef BUILD_WITH_ZMQ_SECURITY
        ts->zmq_cert = sub_cert;
        ts->zmq_pub_cert = pub_cert;
@@ -508,9 +532,11 @@ static void* zmq_recv_thread_func(void * arg) {
 
                celixThreadMutex_lock(&sub->socket_lock);
 
-               zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
+               zframe_t* headerMsg = zframe_recv_nowait(sub->zmq_socket);
                if (headerMsg == NULL) {
-                       if (errno == EINTR) {
+                       if(errno == EAGAIN) {
+                               usleep(sub->zmqReceiveTimeout);
+                       } else if (errno == EINTR) {
                                //It means we got a signal and we have to 
exit...
                                printf("PSA_ZMQ_TS: header_recv thread for 
topic got a signal and will exit.\n");
                        } else {

Reply via email to