This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 55c803e6f49b0451f20c8d7d163caf35959fe119
Author: Pepijn Noltes <[email protected]>
AuthorDate: Mon Jul 3 20:03:22 2023 +0200

    Revert scheduled event usage in pstm, because network io is used
---
 .../src/pubsub_topology_manager.c                  | 103 ++++++++++++++-------
 .../src/pubsub_topology_manager.h                  |   9 +-
 2 files changed, 77 insertions(+), 35 deletions(-)

diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index adb259f7..fc055db0 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -43,7 +43,7 @@
 #define UUID_STR_LEN    37
 #endif
 
-static void pstm_psaHandlingEvent(void* data);
+static void *pstm_psaHandlingThread(void *data);
 
 celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, 
celix_log_helper_t *logHelper, pubsub_topology_manager_t **out) {
     celix_status_t status = CELIX_SUCCESS;
@@ -64,6 +64,9 @@ celix_status_t 
pubsub_topologyManager_create(celix_bundle_context_t *context, ce
     status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
     status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
     status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL);
+    status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
+
+    status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
 
     manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
     manager->announceEndpointListeners.list = celix_arrayList_create();
@@ -74,30 +77,26 @@ celix_status_t 
pubsub_topologyManager_create(celix_bundle_context_t *context, ce
 
     manager->loghelper = logHelper;
     manager->verbose = celix_bundleContext_getPropertyAsBool(context, 
PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-    long handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, 
PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
+    unsigned handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, 
PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
     if ( handlingThreadSleepTime >= 0 ) {
         manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L;
     }
     manager->handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY,  
manager->handlingThreadSleepTime);
-
-    double handlingThreadSleepTimeInSeconds = 
((double)manager->handlingThreadSleepTime) / 1000.0;
-    celix_scheduled_event_options_t schedOpts = 
CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS;
-    schedOpts.name = "PubSub TopologyManager PSA Handling";
-    schedOpts.initialDelayInSeconds = handlingThreadSleepTimeInSeconds;
-    schedOpts.intervalInSeconds = handlingThreadSleepTimeInSeconds;
-    schedOpts.callbackData = manager;
-    schedOpts.callback = pstm_psaHandlingEvent;
-    manager->scheduledEventId = celix_bundleContext_scheduleEvent(context, 
&schedOpts);
+    manager->psaHandling.running = true;
+    celixThread_create(&manager->psaHandling.thread, NULL, 
pstm_psaHandlingThread, manager);
+    celixThread_setName(&manager->psaHandling.thread, "PubSub 
TopologyManager");
 
     return status;
 }
 
-void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
-    if (manager == NULL) {
-        return;
-    }
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager) {
+    celix_status_t status = CELIX_SUCCESS;
 
-    celix_bundleContext_removeScheduledEvent(manager->context, 
manager->scheduledEventId);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    manager->psaHandling.running = false;
+    celixThreadCondition_broadcast(&manager->psaHandling.cond);
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    celixThread_join(manager->psaHandling.thread, NULL);
 
     celixThreadMutex_lock(&manager->pubsubadmins.mutex);
     hashMap_destroy(manager->pubsubadmins.map, false, false);
@@ -172,6 +171,8 @@ void 
pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
     hashMap_destroy(manager->psaMetrics.map, false, false);
 
     free(manager);
+
+    return status;
 }
 
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const 
celix_properties_t *props __attribute__((unused))) {
@@ -217,8 +218,11 @@ void pubsub_topologyManager_psaAdded(void *handle, void 
*svc, const celix_proper
                       "A new PSA is added after at least one active 
publisher/provided. \
                 It is preferred that all PSA are started before 
publiser/subscriber are started!\n\
                 Current topic/sender count is %i", needsRematchCount);
-        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
     }
+
 }
 
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props) {
@@ -299,7 +303,9 @@ void pubsub_topologyManager_psaRemoved(void *handle, void 
*svc __attribute__((un
     }
     celix_arrayList_destroy(revokedEndpoints);
 
-    celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    celixThreadCondition_broadcast(&manager->psaHandling.cond);
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
 }
 
 void pubsub_topologyManager_subscriberAdded(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
@@ -342,12 +348,16 @@ void pubsub_topologyManager_subscriberAdded(void *handle, 
void *svc __attribute_
         entry->subscriberProperties = celix_properties_copy(props);
         hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, 
entry);
         celix_logHelper_trace(manager->loghelper, "Created new topic receiver 
entry %s", entry->scopeAndTopicKey);
-
-        //new entry, signal psaHandling event to setup topic receiver
-        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
     }
+    //signal psa handling thread
+    bool triggerCondition = (entry->usageCount == 1);
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
+    if (triggerCondition) {
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
 }
 
 void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
@@ -382,6 +392,7 @@ void 
pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
 
     celix_logHelper_trace(manager->loghelper, "Added EndpointListener");
 
+
     //1) retroactively call announceEndpoint for already existing endpoints 
(manager->announcedEndpoints)
     //2) Add listener to manager->announceEndpointListeners
 
@@ -440,6 +451,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void 
*handle, const celix_serv
     //2) update the usage count. if not found
     //3) signal psaHandling thread to find a psa and setup TopicSender
 
+
     char *topicFromFilter = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, 
&topicFromFilter);
@@ -480,12 +492,17 @@ void pubsub_topologyManager_publisherTrackerAdded(void 
*handle, const celix_serv
         entry->bndId = info->bundleId;
         hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
         celix_logHelper_trace(manager->loghelper, "Created new topic sender 
entry %s", entry->scopeAndTopicKey);
-
-        //new entry, so wake up psa handling thread
-        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
     }
+    //new entry -> wakeup psaHandling thread
+    bool triggerCondition = (entry->usageCount == 1);
+
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
 
+    if (triggerCondition) {
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
 }
 
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const 
celix_service_tracker_info_t *info) {
@@ -535,6 +552,7 @@ celix_status_t 
pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
     // 1) See if endpoint is already discovered, if so increase usage count.
     // 1) If not, find matching psa using the matchEndpoint
     // 2) if found call addEndpoint of the matching psa
+    bool triggerCondition = false;
 
     if (manager->verbose) {
         celix_logHelper_trace(manager->loghelper,
@@ -562,10 +580,17 @@ celix_status_t 
pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
         celix_logHelper_trace(manager->loghelper, "Created new discovered 
endpoint entry %s", uuid);
 
         //waking up psa handling thread to select psa
-        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
+        triggerCondition = true;
+
     }
     celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 
+    if (triggerCondition) {
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_broadcast(&manager->psaHandling.cond);
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
+
     return status;
 }
 
@@ -1102,18 +1127,30 @@ static void 
pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
     celix_arrayList_destroy(setupEntries);
 }
 
-static void pstm_psaHandlingEvent(void* data) {
+static void *pstm_psaHandlingThread(void *data) {
     pubsub_topology_manager_t *manager = data;
 
-    //first teardown -> also if rematch is needed
-    pstm_teardownTopicSenders(manager);
-    pstm_teardownTopicReceivers(manager);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    bool running = manager->psaHandling.running;
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
+
+    while (running) {
+        //first teardown -> also if rematch is needed
+        pstm_teardownTopicSenders(manager);
+        pstm_teardownTopicReceivers(manager);
 
-    //then see if any topic sender/receiver are needed
-    pstm_setupTopicSenders(manager);
-    pstm_setupTopicReceivers(manager);
+        //then see if any topic sender/receiver are needed
+        pstm_setupTopicSenders(manager);
+        pstm_setupTopicReceivers(manager);
 
-    pstm_findPsaForEndpoints(manager); //trying to find psa and possible set 
for endpoints with no psa
+        pstm_findPsaForEndpoints(manager); //trying to find psa and possible 
set for endpoints with no psa
+
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, 
&manager->psaHandling.mutex, manager->handlingThreadSleepTime  / 1000, 
(manager->handlingThreadSleepTime  % 1000) * 1000000);
+        running = manager->psaHandling.running;
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
+    return NULL;
 }
 
 void pubsub_topologyManager_addMetricsService(void * handle, void *svc, const 
celix_properties_t *props) {
diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index e573fa57..93df5144 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -67,7 +67,12 @@ typedef struct pubsub_topology_manager {
         hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t*
     } psaMetrics;
 
-    long scheduledEventId;
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex; //protect running and condition
+        celix_thread_cond_t cond;
+        bool running;
+    } psaHandling;
 
     celix_log_helper_t *loghelper;
 
@@ -106,7 +111,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 } pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, 
celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager);
-void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager);
 
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const 
celix_properties_t *props);
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const 
celix_properties_t *props);

Reply via email to