PengZheng commented on code in PR #583:
URL: https://github.com/apache/celix/pull/583#discussion_r1246093527


##########
libs/utils/src/celix_threads.c:
##########
@@ -182,21 +175,39 @@ celix_status_t 
celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond,
 }
 #else
 celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t 
*cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
-    struct timespec time;
-    seconds = seconds >= 0 ? seconds : 0;
-    time = celix_gettime(CLOCK_MONOTONIC);
-    time.tv_sec += seconds;
-    if(nanoseconds > 0) {
-        time.tv_nsec += nanoseconds;
-        while (time.tv_nsec > CELIX_NS_IN_SEC) {
-            time.tv_sec++;
-            time.tv_nsec -= CELIX_NS_IN_SEC;
-        }
-    }
+    double delay = (double)seconds + ((double)nanoseconds / 1000000000);
+    struct timespec time = celixThreadCondition_getDelayedTime(delay);
     return pthread_cond_timedwait(cond, mutex, &time);
 }
 #endif
 
+struct timespec celixThreadCondition_getTime() {
+    return celixThreadCondition_getDelayedTime(0);
+}
+
+struct timespec celixThreadCondition_getDelayedTime(double delayInSeconds) {
+#if __APPLE__
+    struct timeval tv;
+    struct timespec now;
+    gettimeofday(&tv, NULL);

Review Comment:
   I have suffered a lot from realtime clock in the past. 
   A clock synchronization may lead to program freeze.
   We really have little control of the whole system when our program is 
running.
   Though this change does not bite me, since I work solely on Linux, I still 
recommend using `celixThreadCondition_timedwaitRelative` to implement 
`celixThreadCondition_waitUntil` on macOS.



##########
bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c:
##########
@@ -1127,30 +1102,18 @@ static void 
pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
     celix_arrayList_destroy(setupEntries);
 }
 
-static void *pstm_psaHandlingThread(void *data) {
+static void pstm_psaHandlingEvent(void* data) {
     pubsub_topology_manager_t *manager = data;
 
-    celixThreadMutex_lock(&manager->psaHandling.mutex);
-    bool running = manager->psaHandling.running;
-    celixThreadMutex_unlock(&manager->psaHandling.mutex);
-
-    while (running) {
-        //first teardown -> also if rematch is needed
-        pstm_teardownTopicSenders(manager);
-        pstm_teardownTopicReceivers(manager);
-
-        //then see if any topic sender/receiver are needed
-        pstm_setupTopicSenders(manager);
-        pstm_setupTopicReceivers(manager);
+    //first teardown -> also if rematch is needed
+    pstm_teardownTopicSenders(manager);
+    pstm_teardownTopicReceivers(manager);

Review Comment:
   This will join topic receiver's work thread, which does blocking network IO.
   We can not afford that blocking in the event loop.



##########
bundles/remote_services/discovery_shm/src/discovery_shmWatcher.c:
##########
@@ -165,20 +167,15 @@ static void* discoveryShmWatcher_run(void* data) {
     }
 
     if (endpointDiscoveryServer_getUrl(discovery->server, &url[0], 
MAX_LOCALNODE_LENGTH) != CELIX_SUCCESS) {
-        snprintf(url, MAX_LOCALNODE_LENGTH, "http://%s:%s/%s";, 
DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
+        snprintf(url, MAX_LOCALNODE_LENGTH, "http://%s:%s/%s";, 
DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT,
+                 DEFAULT_SERVER_PATH);
     }
 
-    while (watcher->running) {
-        // register own framework
-        if (discoveryShm_set(watcher->shmData, localNodePath, url) != 
CELIX_SUCCESS) {
-            celix_logHelper_log(discovery->loghelper, CELIX_LOG_LEVEL_WARNING, 
"Cannot set local discovery registration.");
-        }
-
-        discoveryShmWatcher_syncEndpoints(discovery);
-        sleep(5);
+    if (discoveryShm_set(watcher->shmData, localNodePath, url) != 
CELIX_SUCCESS) {
+        celix_logHelper_log(discovery->loghelper, CELIX_LOG_LEVEL_WARNING, 
"Cannot set local discovery registration.");
     }
 
-    return NULL;
+    discoveryShmWatcher_syncEndpoints(discovery);

Review Comment:
   Network IO is involved?



##########
bundles/remote_services/discovery_common/src/endpoint_discovery_poller.c:
##########
@@ -290,43 +292,32 @@ celix_status_t 
endpointDiscoveryPoller_poll(endpoint_discovery_poller_t *poller,
        return status;
 }
 
-static void *endpointDiscoveryPoller_performPeriodicPoll(void *data) {
-       endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) 
data;
-
-       useconds_t interval = (useconds_t) (poller->poll_interval * 1000000L);
-
-       while (poller->running) {
-               usleep(interval);
-               celix_status_t status = 
celixThreadMutex_lock(&poller->pollerLock);
+static void endpointDiscoveryPoller_performPeriodicPoll(void *data) {
+    endpoint_discovery_poller_t *poller = (endpoint_discovery_poller_t *) data;
+    celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: failed 
to obtain lock; retrying...");
+    } else {
+        hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
 
-               if (status != CELIX_SUCCESS) {
-            celix_logHelper_warning(*poller->loghelper, "ENDPOINT_POLLER: 
failed to obtain lock; retrying...");
-               } else {
-                       hash_map_iterator_pt iterator = 
hashMapIterator_create(poller->entries);
+        while (hashMapIterator_hasNext(iterator)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
 
-                       while (hashMapIterator_hasNext(iterator)) {
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
+            char *url = hashMapEntry_getKey(entry);
+            array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
 
-                               char *url = hashMapEntry_getKey(entry);
-                               array_list_pt currentEndpoints = 
hashMapEntry_getValue(entry);
+            endpointDiscoveryPoller_poll(poller, url, currentEndpoints);

Review Comment:
   Network IO is involved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@celix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to