This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread
in repository https://gitbox.apache.org/repos/asf/celix.git

commit c62baa027610dca38bc84eb796db1707602227f2
Author: Pepijn Noltes <[email protected]>
AuthorDate: Thu Jun 15 20:24:08 2023 +0200

    Replace thread in PSTM for scheduled event
---
 .../src/pubsub_topology_manager.c                  | 129 ++++++++++-----------
 .../src/pubsub_topology_manager.h                  |  15 +--
 2 files changed, 67 insertions(+), 77 deletions(-)

diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a204668b..e603276d 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -43,7 +43,7 @@
 #define UUID_STR_LEN    37
 #endif
 
-static void *pstm_psaHandlingThread(void *data);
+static void pstm_psaHandlingEvent(void* data);
 
 celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, 
celix_log_helper_t *logHelper, pubsub_topology_manager_t **out) {
     celix_status_t status = CELIX_SUCCESS;
@@ -64,9 +64,6 @@ celix_status_t 
pubsub_topologyManager_create(celix_bundle_context_t *context, ce
     status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
     status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
     status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL);
-    status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
-
-    status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
 
     manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
     manager->announceEndpointListeners.list = celix_arrayList_create();
@@ -77,26 +74,30 @@ celix_status_t 
pubsub_topologyManager_create(celix_bundle_context_t *context, ce
 
     manager->loghelper = logHelper;
     manager->verbose = celix_bundleContext_getPropertyAsBool(context, 
PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-    unsigned handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, 
PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
+    long handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, 
PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
     if ( handlingThreadSleepTime >= 0 ) {
         manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L;
     }
     manager->handlingThreadSleepTime = 
celix_bundleContext_getPropertyAsLong(context, 
PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY,  
manager->handlingThreadSleepTime);
-    manager->psaHandling.running = true;
-    celixThread_create(&manager->psaHandling.thread, NULL, 
pstm_psaHandlingThread, manager);
-    celixThread_setName(&manager->psaHandling.thread, "PubSub 
TopologyManager");
+
+    double handlingThreadSleepTimeInSeconds = 
((double)manager->handlingThreadSleepTime) / 1000.0;
+    celix_scheduled_event_options_t schedOpts = 
CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS;
+    schedOpts.name = "PubSub TopologyManager PSA Handling";
+    schedOpts.initialDelayInSeconds = handlingThreadSleepTimeInSeconds;
+    schedOpts.intervalInSeconds = handlingThreadSleepTimeInSeconds;
+    schedOpts.callbackData = manager;
+    schedOpts.callback = pstm_psaHandlingEvent;
+    manager->scheduledEventId = celix_bundleContext_scheduleEvent(context, 
&schedOpts);
 
     return status;
 }
 
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager) {
-    celix_status_t status = CELIX_SUCCESS;
+void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) {
+    if (manager == NULL) {
+        return;
+    }
 
-    celixThreadMutex_lock(&manager->psaHandling.mutex);
-    manager->psaHandling.running = false;
-    celixThreadCondition_broadcast(&manager->psaHandling.cond);
-    celixThreadMutex_unlock(&manager->psaHandling.mutex);
-    celixThread_join(manager->psaHandling.thread, NULL);
+    celix_bundleContext_removeScheduledEvent(manager->context, 
manager->scheduledEventId);
 
     celixThreadMutex_lock(&manager->pubsubadmins.mutex);
     hashMap_destroy(manager->pubsubadmins.map, false, false);
@@ -171,8 +172,6 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager
     hashMap_destroy(manager->psaMetrics.map, false, false);
 
     free(manager);
-
-    return status;
 }
 
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const 
celix_properties_t *props __attribute__((unused))) {
@@ -218,11 +217,8 @@ void pubsub_topologyManager_psaAdded(void *handle, void 
*svc, const celix_proper
                       "A new PSA is added after at least one active 
publisher/provided. \
                 It is preferred that all PSA are started before 
publiser/subscriber are started!\n\
                 Current topic/sender count is %i", needsRematchCount);
-        celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_broadcast(&manager->psaHandling.cond);
-        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
     }
-
 }
 
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props) {
@@ -303,9 +299,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void 
*svc __attribute__((un
     }
     celix_arrayList_destroy(revokedEndpoints);
 
-    celixThreadMutex_lock(&manager->psaHandling.mutex);
-    celixThreadCondition_broadcast(&manager->psaHandling.cond);
-    celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
 }
 
 void pubsub_topologyManager_subscriberAdded(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
@@ -348,16 +342,12 @@ void pubsub_topologyManager_subscriberAdded(void *handle, 
void *svc __attribute_
         entry->subscriberProperties = celix_properties_copy(props);
         hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, 
entry);
         celix_logHelper_trace(manager->loghelper, "Created new topic receiver 
entry %s", entry->scopeAndTopicKey);
+
+        //new entry, signal psaHandling event to setup topic receiver
+        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
     }
-    //signal psa handling thread
-    bool triggerCondition = (entry->usageCount == 1);
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
-    if (triggerCondition) {
-        celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_broadcast(&manager->psaHandling.cond);
-        celixThreadMutex_unlock(&manager->psaHandling.mutex);
-    }
 }
 
 void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
@@ -392,7 +382,6 @@ void 
pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
 
     celix_logHelper_trace(manager->loghelper, "Added EndpointListener");
 
-
     //1) retroactively call announceEndpoint for already existing endpoints 
(manager->announcedEndpoints)
     //2) Add listener to manager->announceEndpointListeners
 
@@ -451,7 +440,6 @@ void pubsub_topologyManager_publisherTrackerAdded(void 
*handle, const celix_serv
     //2) update the usage count. if not found
     //3) signal psaHandling thread to find a psa and setup TopicSender
 
-
     char *topicFromFilter = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, 
&topicFromFilter);
@@ -492,17 +480,12 @@ void pubsub_topologyManager_publisherTrackerAdded(void 
*handle, const celix_serv
         entry->bndId = info->bundleId;
         hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
         celix_logHelper_trace(manager->loghelper, "Created new topic sender 
entry %s", entry->scopeAndTopicKey);
-    }
-    //new entry -> wakeup psaHandling thread
-    bool triggerCondition = (entry->usageCount == 1);
 
+        //new entry, so wake up psa handling thread
+        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
+    }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
 
-    if (triggerCondition) {
-        celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_broadcast(&manager->psaHandling.cond);
-        celixThreadMutex_unlock(&manager->psaHandling.mutex);
-    }
 }
 
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const 
celix_service_tracker_info_t *info) {
@@ -552,7 +535,6 @@ celix_status_t 
pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
     // 1) See if endpoint is already discovered, if so increase usage count.
     // 1) If not, find matching psa using the matchEndpoint
     // 2) if found call addEndpoint of the matching psa
-    bool triggerCondition = false;
 
     if (manager->verbose) {
         celix_logHelper_trace(manager->loghelper,
@@ -580,17 +562,10 @@ celix_status_t 
pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
         celix_logHelper_trace(manager->loghelper, "Created new discovered 
endpoint entry %s", uuid);
 
         //waking up psa handling thread to select psa
-        triggerCondition = true;
-
+        celix_bundleContext_wakeupScheduledEvent(manager->context, 
manager->scheduledEventId);
     }
     celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 
-    if (triggerCondition) {
-        celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_broadcast(&manager->psaHandling.cond);
-        celixThreadMutex_unlock(&manager->psaHandling.mutex);
-    }
-
     return status;
 }
 
@@ -1127,32 +1102,46 @@ static void 
pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
     celix_arrayList_destroy(setupEntries);
 }
 
-static void *pstm_psaHandlingThread(void *data) {
+static void pstm_psaHandlingEvent(void* data) {
     pubsub_topology_manager_t *manager = data;
 
-    celixThreadMutex_lock(&manager->psaHandling.mutex);
-    bool running = manager->psaHandling.running;
-    celixThreadMutex_unlock(&manager->psaHandling.mutex);
-
-    while (running) {
-        //first teardown -> also if rematch is needed
-        pstm_teardownTopicSenders(manager);
-        pstm_teardownTopicReceivers(manager);
-
-        //then see if any topic sender/receiver are needed
-        pstm_setupTopicSenders(manager);
-        pstm_setupTopicReceivers(manager);
+    //first teardown -> also if rematch is needed
+    pstm_teardownTopicSenders(manager);
+    pstm_teardownTopicReceivers(manager);
 
-        pstm_findPsaForEndpoints(manager); //trying to find psa and possible 
set for endpoints with no psa
+    //then see if any topic sender/receiver are needed
+    pstm_setupTopicSenders(manager);
+    pstm_setupTopicReceivers(manager);
 
-        celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, 
&manager->psaHandling.mutex, manager->handlingThreadSleepTime  / 1000, 
(manager->handlingThreadSleepTime  % 1000) * 1000000);
-        running = manager->psaHandling.running;
-        celixThreadMutex_unlock(&manager->psaHandling.mutex);
-    }
-    return NULL;
+    pstm_findPsaForEndpoints(manager); //trying to find psa and possible set 
for endpoints with no psa
 }
 
+//static void *pstm_psaHandlingThread(void *data) {
+//    pubsub_topology_manager_t *manager = data;
+//
+//    celixThreadMutex_lock(&manager->psaHandling.mutex);
+//    bool running = manager->psaHandling.running;
+//    celixThreadMutex_unlock(&manager->psaHandling.mutex);
+//
+//    while (running) {
+//        //first teardown -> also if rematch is needed
+//        pstm_teardownTopicSenders(manager);
+//        pstm_teardownTopicReceivers(manager);
+//
+//        //then see if any topic sender/receiver are needed
+//        pstm_setupTopicSenders(manager);
+//        pstm_setupTopicReceivers(manager);
+//
+//        pstm_findPsaForEndpoints(manager); //trying to find psa and possible 
set for endpoints with no psa
+//
+//        celixThreadMutex_lock(&manager->psaHandling.mutex);
+//        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, 
&manager->psaHandling.mutex, manager->handlingThreadSleepTime  / 1000, 
(manager->handlingThreadSleepTime  % 1000) * 1000000);
+//        running = manager->psaHandling.running;
+//        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+//    }
+//    return NULL;
+//}
+
 void pubsub_topologyManager_addMetricsService(void * handle, void *svc, const 
celix_properties_t *props) {
     pubsub_topology_manager_t *manager = handle;
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 93df5144..f89353e9 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -67,12 +67,13 @@ typedef struct pubsub_topology_manager {
         hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t*
     } psaMetrics;
 
-    struct {
-        celix_thread_t thread;
-        celix_thread_mutex_t mutex; //protect running and condition
-        celix_thread_cond_t cond;
-        bool running;
-    } psaHandling;
+//    struct {
+//        celix_thread_t thread;
+//        celix_thread_mutex_t mutex; //protect running and condition
+//        celix_thread_cond_t cond;
+//        bool running;
+//    } psaHandling;
+    long scheduledEventId;
 
     celix_log_helper_t *loghelper;
 
@@ -111,7 +112,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 } pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, 
celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager);
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager);
+void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager);
 
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const 
celix_properties_t *props);
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const 
celix_properties_t *props);

Reply via email to