rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534422059



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED,
-                                                                   
PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, 
&sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, 
PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = 
celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, 
PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, 
PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? 
staticClientEndPointUrls : staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, 
endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, 
passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = 
pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, 
sender->logHelper);
     }
 
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, 
PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
         const char *sched = celix_properties_get(topicProperties, 
PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, 
PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   
PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       
(!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
-                                                                               
        PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, 
PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, 
PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  
PSA_TCP_SEND_DELAY, PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, 
sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned 
int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) 
maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, 
(passiveKey) ? true : false);

Review comment:
       When the passive connection is used a full duplex connection is setup.
   This means also EPOLLIN is enabled for the publisher




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to