This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit c62baa027610dca38bc84eb796db1707602227f2 Author: Pepijn Noltes <[email protected]> AuthorDate: Thu Jun 15 20:24:08 2023 +0200 Replace thread in PSTM for scheduled event --- .../src/pubsub_topology_manager.c | 129 ++++++++++----------- .../src/pubsub_topology_manager.h | 15 +-- 2 files changed, 67 insertions(+), 77 deletions(-) diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index a204668b..e603276d 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -43,7 +43,7 @@ #define UUID_STR_LEN 37 #endif -static void *pstm_psaHandlingThread(void *data); +static void pstm_psaHandlingEvent(void* data); celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **out) { celix_status_t status = CELIX_SUCCESS; @@ -64,9 +64,6 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL); status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL); status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL); - status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL); - - status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL); manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); manager->announceEndpointListeners.list = celix_arrayList_create(); @@ -77,26 +74,30 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce manager->loghelper = logHelper; manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE); - unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS); + long handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS); if ( handlingThreadSleepTime >= 0 ) { manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L; } manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY, manager->handlingThreadSleepTime); - manager->psaHandling.running = true; - celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager); - celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager"); + + double handlingThreadSleepTimeInSeconds = ((double)manager->handlingThreadSleepTime) / 1000.0; + celix_scheduled_event_options_t schedOpts = CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS; + schedOpts.name = "PubSub TopologyManager PSA Handling"; + schedOpts.initialDelayInSeconds = handlingThreadSleepTimeInSeconds; + schedOpts.intervalInSeconds = handlingThreadSleepTimeInSeconds; + schedOpts.callbackData = manager; + schedOpts.callback = pstm_psaHandlingEvent; + manager->scheduledEventId = celix_bundleContext_scheduleEvent(context, &schedOpts); return status; } -celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { - celix_status_t status = CELIX_SUCCESS; +void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { + if (manager == NULL) { + return; + } - celixThreadMutex_lock(&manager->psaHandling.mutex); - manager->psaHandling.running = false; - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); - celixThread_join(manager->psaHandling.thread, NULL); + celix_bundleContext_removeScheduledEvent(manager->context, manager->scheduledEventId); celixThreadMutex_lock(&manager->pubsubadmins.mutex); hashMap_destroy(manager->pubsubadmins.map, false, false); @@ -171,8 +172,6 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager hashMap_destroy(manager->psaMetrics.map, false, false); free(manager); - - return status; } void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) { @@ -218,11 +217,8 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper "A new PSA is added after at least one active publisher/provided. \ It is preferred that all PSA are started before publiser/subscriber are started!\n\ Current topic/sender count is %i", needsRematchCount); - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); + celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } - } void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) { @@ -303,9 +299,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un } celix_arrayList_destroy(revokedEndpoints); - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); + celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { @@ -348,16 +342,12 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ entry->subscriberProperties = celix_properties_copy(props); hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry); celix_logHelper_trace(manager->loghelper, "Created new topic receiver entry %s", entry->scopeAndTopicKey); + + //new entry, signal psaHandling event to setup topic receiver + celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } - //signal psa handling thread - bool triggerCondition = (entry->usageCount == 1); celixThreadMutex_unlock(&manager->topicReceivers.mutex); - if (triggerCondition) { - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); - } } void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { @@ -392,7 +382,6 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo celix_logHelper_trace(manager->loghelper, "Added EndpointListener"); - //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints) //2) Add listener to manager->announceEndpointListeners @@ -451,7 +440,6 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv //2) update the usage count. if not found //3) signal psaHandling thread to find a psa and setup TopicSender - char *topicFromFilter = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, &topicFromFilter); @@ -492,17 +480,12 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv entry->bndId = info->bundleId; hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry); celix_logHelper_trace(manager->loghelper, "Created new topic sender entry %s", entry->scopeAndTopicKey); - } - //new entry -> wakeup psaHandling thread - bool triggerCondition = (entry->usageCount == 1); + //new entry, so wake up psa handling thread + celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); + } celixThreadMutex_unlock(&manager->topicSenders.mutex); - if (triggerCondition) { - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); - } } void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) { @@ -552,7 +535,6 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const // 1) See if endpoint is already discovered, if so increase usage count. // 1) If not, find matching psa using the matchEndpoint // 2) if found call addEndpoint of the matching psa - bool triggerCondition = false; if (manager->verbose) { celix_logHelper_trace(manager->loghelper, @@ -580,17 +562,10 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_logHelper_trace(manager->loghelper, "Created new discovered endpoint entry %s", uuid); //waking up psa handling thread to select psa - triggerCondition = true; - + celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); - if (triggerCondition) { - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_broadcast(&manager->psaHandling.cond); - celixThreadMutex_unlock(&manager->psaHandling.mutex); - } - return status; } @@ -1127,32 +1102,46 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) { celix_arrayList_destroy(setupEntries); } -static void *pstm_psaHandlingThread(void *data) { +static void pstm_psaHandlingEvent(void* data) { pubsub_topology_manager_t *manager = data; - celixThreadMutex_lock(&manager->psaHandling.mutex); - bool running = manager->psaHandling.running; - celixThreadMutex_unlock(&manager->psaHandling.mutex); - - while (running) { - //first teardown -> also if rematch is needed - pstm_teardownTopicSenders(manager); - pstm_teardownTopicReceivers(manager); - - //then see if any topic sender/receiver are needed - pstm_setupTopicSenders(manager); - pstm_setupTopicReceivers(manager); + //first teardown -> also if rematch is needed + pstm_teardownTopicSenders(manager); + pstm_teardownTopicReceivers(manager); - pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa + //then see if any topic sender/receiver are needed + pstm_setupTopicSenders(manager); + pstm_setupTopicReceivers(manager); - celixThreadMutex_lock(&manager->psaHandling.mutex); - celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime / 1000, (manager->handlingThreadSleepTime % 1000) * 1000000); - running = manager->psaHandling.running; - celixThreadMutex_unlock(&manager->psaHandling.mutex); - } - return NULL; + pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa } +//static void *pstm_psaHandlingThread(void *data) { +// pubsub_topology_manager_t *manager = data; +// +// celixThreadMutex_lock(&manager->psaHandling.mutex); +// bool running = manager->psaHandling.running; +// celixThreadMutex_unlock(&manager->psaHandling.mutex); +// +// while (running) { +// //first teardown -> also if rematch is needed +// pstm_teardownTopicSenders(manager); +// pstm_teardownTopicReceivers(manager); +// +// //then see if any topic sender/receiver are needed +// pstm_setupTopicSenders(manager); +// pstm_setupTopicReceivers(manager); +// +// pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa +// +// celixThreadMutex_lock(&manager->psaHandling.mutex); +// celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime / 1000, (manager->handlingThreadSleepTime % 1000) * 1000000); +// running = manager->psaHandling.running; +// celixThreadMutex_unlock(&manager->psaHandling.mutex); +// } +// return NULL; +//} + void pubsub_topologyManager_addMetricsService(void * handle, void *svc, const celix_properties_t *props) { pubsub_topology_manager_t *manager = handle; long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index 93df5144..f89353e9 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -67,12 +67,13 @@ typedef struct pubsub_topology_manager { hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t* } psaMetrics; - struct { - celix_thread_t thread; - celix_thread_mutex_t mutex; //protect running and condition - celix_thread_cond_t cond; - bool running; - } psaHandling; +// struct { +// celix_thread_t thread; +// celix_thread_mutex_t mutex; //protect running and condition +// celix_thread_cond_t cond; +// bool running; +// } psaHandling; + long scheduledEventId; celix_log_helper_t *loghelper; @@ -111,7 +112,7 @@ typedef struct pstm_topic_receiver_or_sender_entry { } pstm_topic_receiver_or_sender_entry_t; celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager); -celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); +void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props); void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
