This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit 55c803e6f49b0451f20c8d7d163caf35959fe119 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jul 3 20:03:22 2023 +0200 Revert scheduled event usage in pstm, because network io is used --- .../src/pubsub_topology_manager.c | 103 ++++++++++++++------- .../src/pubsub_topology_manager.h | 9 +- 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index adb259f7..fc055db0 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -43,7 +43,7 @@ #define UUID_STR_LEN 37 #endif -static void pstm_psaHandlingEvent(void* data); +static void *pstm_psaHandlingThread(void *data); celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **out) { celix_status_t status = CELIX_SUCCESS; @@ -64,6 +64,9 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL); status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL); status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL); + status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL); + + status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL); manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); manager->announceEndpointListeners.list = celix_arrayList_create(); @@ -74,30 +77,26 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce manager->loghelper = logHelper; manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE); - long handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS); + unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS); if ( handlingThreadSleepTime >= 0 ) { manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L; } manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY, manager->handlingThreadSleepTime); - - double handlingThreadSleepTimeInSeconds = ((double)manager->handlingThreadSleepTime) / 1000.0; - celix_scheduled_event_options_t schedOpts = CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS; - schedOpts.name = "PubSub TopologyManager PSA Handling"; - schedOpts.initialDelayInSeconds = handlingThreadSleepTimeInSeconds; - schedOpts.intervalInSeconds = handlingThreadSleepTimeInSeconds; - schedOpts.callbackData = manager; - schedOpts.callback = pstm_psaHandlingEvent; - manager->scheduledEventId = celix_bundleContext_scheduleEvent(context, &schedOpts); + manager->psaHandling.running = true; + celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager); + celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager"); return status; } -void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { - if (manager == NULL) { - return; - } +celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { + celix_status_t status = CELIX_SUCCESS; - celix_bundleContext_removeScheduledEvent(manager->context, manager->scheduledEventId); + celixThreadMutex_lock(&manager->psaHandling.mutex); + manager->psaHandling.running = false; + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + celixThread_join(manager->psaHandling.thread, NULL); celixThreadMutex_lock(&manager->pubsubadmins.mutex); hashMap_destroy(manager->pubsubadmins.map, false, false); @@ -172,6 +171,8 @@ void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { hashMap_destroy(manager->psaMetrics.map, false, false); free(manager); + + return status; } void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props __attribute__((unused))) { @@ -217,8 +218,11 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper "A new PSA is added after at least one active publisher/provided. \ It is preferred that all PSA are started before publiser/subscriber are started!\n\ Current topic/sender count is %i", needsRematchCount); - celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); } + } void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) { @@ -299,7 +303,9 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un } celix_arrayList_destroy(revokedEndpoints); - celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); } void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { @@ -342,12 +348,16 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ entry->subscriberProperties = celix_properties_copy(props); hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry); celix_logHelper_trace(manager->loghelper, "Created new topic receiver entry %s", entry->scopeAndTopicKey); - - //new entry, signal psaHandling event to setup topic receiver - celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } + //signal psa handling thread + bool triggerCondition = (entry->usageCount == 1); celixThreadMutex_unlock(&manager->topicReceivers.mutex); + if (triggerCondition) { + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + } } void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { @@ -382,6 +392,7 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo celix_logHelper_trace(manager->loghelper, "Added EndpointListener"); + //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints) //2) Add listener to manager->announceEndpointListeners @@ -440,6 +451,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv //2) update the usage count. if not found //3) signal psaHandling thread to find a psa and setup TopicSender + char *topicFromFilter = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, &topicFromFilter); @@ -480,12 +492,17 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv entry->bndId = info->bundleId; hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry); celix_logHelper_trace(manager->loghelper, "Created new topic sender entry %s", entry->scopeAndTopicKey); - - //new entry, so wake up psa handling thread - celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); } + //new entry -> wakeup psaHandling thread + bool triggerCondition = (entry->usageCount == 1); + celixThreadMutex_unlock(&manager->topicSenders.mutex); + if (triggerCondition) { + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + } } void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) { @@ -535,6 +552,7 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const // 1) See if endpoint is already discovered, if so increase usage count. // 1) If not, find matching psa using the matchEndpoint // 2) if found call addEndpoint of the matching psa + bool triggerCondition = false; if (manager->verbose) { celix_logHelper_trace(manager->loghelper, @@ -562,10 +580,17 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_logHelper_trace(manager->loghelper, "Created new discovered endpoint entry %s", uuid); //waking up psa handling thread to select psa - celix_bundleContext_wakeupScheduledEvent(manager->context, manager->scheduledEventId); + triggerCondition = true; + } celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + if (triggerCondition) { + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + } + return status; } @@ -1102,18 +1127,30 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) { celix_arrayList_destroy(setupEntries); } -static void pstm_psaHandlingEvent(void* data) { +static void *pstm_psaHandlingThread(void *data) { pubsub_topology_manager_t *manager = data; - //first teardown -> also if rematch is needed - pstm_teardownTopicSenders(manager); - pstm_teardownTopicReceivers(manager); + celixThreadMutex_lock(&manager->psaHandling.mutex); + bool running = manager->psaHandling.running; + celixThreadMutex_unlock(&manager->psaHandling.mutex); + + while (running) { + //first teardown -> also if rematch is needed + pstm_teardownTopicSenders(manager); + pstm_teardownTopicReceivers(manager); - //then see if any topic sender/receiver are needed - pstm_setupTopicSenders(manager); - pstm_setupTopicReceivers(manager); + //then see if any topic sender/receiver are needed + pstm_setupTopicSenders(manager); + pstm_setupTopicReceivers(manager); - pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa + pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa + + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime / 1000, (manager->handlingThreadSleepTime % 1000) * 1000000); + running = manager->psaHandling.running; + celixThreadMutex_unlock(&manager->psaHandling.mutex); + } + return NULL; } void pubsub_topologyManager_addMetricsService(void * handle, void *svc, const celix_properties_t *props) { diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index e573fa57..93df5144 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -67,7 +67,12 @@ typedef struct pubsub_topology_manager { hash_map_t *map; //key = svcId, value = pubsub_admin_metrics_service_t* } psaMetrics; - long scheduledEventId; + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; //protect running and condition + celix_thread_cond_t cond; + bool running; + } psaHandling; celix_log_helper_t *loghelper; @@ -106,7 +111,7 @@ typedef struct pstm_topic_receiver_or_sender_entry { } pstm_topic_receiver_or_sender_entry_t; celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager); -void pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); +celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props); void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props);
