This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit e72bb907e5fca5c807dfca5bea3c35850377f21c Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Jun 11 23:51:29 2023 +0200 Fix mem leak in schedule event handling --- .../framework/gtest/src/ScheduledEventTestSuite.cc | 21 +++++-- libs/framework/include/celix/ScheduledEvent.h | 65 +++++++--------------- libs/framework/include/celix_bundle_context.h | 5 +- libs/framework/src/celix_scheduled_event.c | 38 +++++++++++-- libs/framework/src/celix_scheduled_event.h | 14 ++++- libs/framework/src/framework.c | 6 +- 6 files changed, 91 insertions(+), 58 deletions(-) diff --git a/libs/framework/gtest/src/ScheduledEventTestSuite.cc b/libs/framework/gtest/src/ScheduledEventTestSuite.cc index 9e53e69b..609d0ef0 100644 --- a/libs/framework/gtest/src/ScheduledEventTestSuite.cc +++ b/libs/framework/gtest/src/ScheduledEventTestSuite.cc @@ -501,9 +501,22 @@ TEST_F(ScheduledEventTestSuite, TimeoutOnWakeupTest) { auto status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), eventId, 0.001); - EXPECT_EQ(CELIX_TIMEOUT, status); + EXPECT_EQ(CELIX_TIMEOUT, status);; +} - //sleep to ensure the event is processed - //TODO fixme, if removed the tests leaks - std::this_thread::sleep_for(std::chrono::milliseconds{10}); +TEST_F(ScheduledEventTestSuite, CxxCancelOneShotEventBeforeFiredTest) { + auto callback = []() { + FAIL() << "Should not be called"; + }; + + //Given a one shot scheduled event with a initial delay of 1s + auto event = fw->getFrameworkBundleContext()->scheduledEvent() + .withInitialDelay(std::chrono::seconds{1}) + .withCallback(callback) + .build(); + + //When the event is cancelled before the initial delay + event.cancel(); + + //Then the event is not fired and does not leak } \ No newline at end of file diff --git a/libs/framework/include/celix/ScheduledEvent.h b/libs/framework/include/celix/ScheduledEvent.h index 6358f0ca..131316a7 100644 --- a/libs/framework/include/celix/ScheduledEvent.h +++ b/libs/framework/include/celix/ScheduledEvent.h @@ -95,6 +95,11 @@ class ScheduledEvent final { } private: + struct Callbacks { + std::function<void()> callback{}; /**< The callback for the scheduled event. */ + std::function<void()> removeCallback{}; /**< The remove callback for the scheduled event. */ + }; + /** * @brief Constructs a scheduled event using the given bundle context and options. * @@ -107,55 +112,27 @@ class ScheduledEvent final { std::function<void()> _removeCallback, celix_scheduled_event_options_t& options) { ctx = std::move(_cCtx); - options.name = _name.c_str(); - configureCallbacks(options, std::move(_callback), std::move(_removeCallback)); - eventId = celix_bundleContext_scheduleEvent(ctx.get(), &options); - } - - /** - * @brief Configure the callbacks for the scheduled event, ensuring the callbacks outlive the scheduled event. - */ - void configureCallbacks(celix_scheduled_event_options_t& options, - std::function<void()> _callback, - std::function<void()> _removeCallback) { isOneShot = options.intervalInSeconds == 0; - if (isOneShot) { - options.callbackData = new std::function<void()>{ - std::move(_callback)}; // to ensure callback outlives the scheduled event object - options.callback = [](void* data) { - auto* cb = static_cast<std::function<void()>*>(data); - (*cb)(); - delete cb; - }; - if (_removeCallback) { - options.removeCallbackData = new std::function<void()>{std::move(_removeCallback)}; - options.removeCallback = [](void* data) { - auto* cb = static_cast<std::function<void()>*>(data); - (*cb)(); - delete cb; - }; - } - } else { - callback = std::move(_callback); - removeCallback = std::move(_removeCallback); - options.callbackData = &callback; - options.callback = [](void* data) { - auto* cb = static_cast<std::function<void()>*>(data); - (*cb)(); - }; - if (removeCallback) { - options.removeCallbackData = &removeCallback; - options.removeCallback = [](void* data) { - auto* cb = static_cast<std::function<void()>*>(data); - (*cb)(); - }; + options.name = _name.c_str(); + auto* callbacks = new Callbacks{std::move(_callback), std::move(_removeCallback)}; + options.callbackData = callbacks; + options.callback = [](void* data) { + auto* callbacks = static_cast<Callbacks*>(data); + (callbacks->callback)(); + }; + options.removeCallbackData = callbacks; + options.removeCallback = [](void* data) { + auto* callbacks = static_cast<Callbacks*>(data); + if (callbacks->removeCallback) { + (callbacks->removeCallback)(); } - } + delete callbacks; + }; + + eventId = celix_bundleContext_scheduleEvent(ctx.get(), &options); } std::shared_ptr<celix_bundle_context_t> ctx{}; /**< The bundle context for the scheduled event. */ - std::function<void()> callback{}; /**< The callback for the scheduled event. */ - std::function<void()> removeCallback{}; /**< The remove callback for the scheduled event. */ long eventId{-1}; /**< The ID of the scheduled event. */ bool isOneShot{false}; /**< Whether the scheduled event is a one-shot event. */ }; diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h index d9296ff4..b930b868 100644 --- a/libs/framework/include/celix_bundle_context.h +++ b/libs/framework/include/celix_bundle_context.h @@ -1289,8 +1289,9 @@ typedef struct celix_scheduled_event_options { /** * @brief Add a scheduled event to the Celix framework. * - * The scheduled event wil be called repeatedly on the Celix framework event thread using the provided interval. - * For the interval time a monotonic clock is used. The event callback should be fast and non-blocking, otherwise + * The scheduled event will be called on the Celix framework event thread, repeatedly using the provided interval or + * once if only a initial delay is provided. + * The event callback should be relatively fast and the scheduled event interval should be relatively high, otherwise * the framework event queue will be blocked and framework will not function properly. * * Scheduled events can be scheduled later than the provided initial delay and interval, because they are processed diff --git a/libs/framework/src/celix_scheduled_event.c b/libs/framework/src/celix_scheduled_event.c index 9bad56da..87ae8c7a 100644 --- a/libs/framework/src/celix_scheduled_event.c +++ b/libs/framework/src/celix_scheduled_event.c @@ -62,9 +62,11 @@ struct celix_scheduled_event { callback. */ celix_thread_mutex_t mutex; /**< The mutex to protect the data below. */ - celix_thread_cond_t cond; /**< The condition variable to signal the scheduled event for a changed callCount. */ + celix_thread_cond_t cond; /**< The condition variable to signal the scheduled event for a changed callCount and + isProcessing. */ size_t useCount; /**< The use count of the scheduled event. */ size_t callCount; /**< The call count of the scheduled event. */ + bool isProcessing; /**< Whether the scheduled event is currently being processed. */ struct timespec lastScheduledEventTime; /**< The last scheduled event time of the scheduled event. */ bool processForWakeup; /**< Whether the scheduled event should be processed directly due to a wakeupScheduledEvent call. */ @@ -108,6 +110,7 @@ celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* l event->removedCallback = removedCallback; event->useCount = 1; event->callCount = 0; + event->isProcessing = false; clock_gettime(CLOCK_MONOTONIC, &event->lastScheduledEventTime); event->processForWakeup = false; @@ -197,7 +200,7 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t event->bndId); celixThreadMutex_lock(&event->mutex); - event->useCount += 1; + event->isProcessing = true; celixThreadMutex_unlock(&event->mutex); assert(event->callback != NULL); @@ -205,14 +208,14 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t celixThreadMutex_lock(&event->mutex); event->lastScheduledEventTime = *currentTime; - event->useCount -= 1; + event->isProcessing = false; event->callCount += 1; event->processForWakeup = false; - celixThreadCondition_broadcast(&event->cond); // broadcast for changed callCount + celixThreadCondition_broadcast(&event->cond); // broadcast for changed callCount and isProcessing celixThreadMutex_unlock(&event->mutex); } -bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event) { +bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event) { bool isDone = false; celixThreadMutex_lock(&event->mutex); isDone = event->intervalInSeconds == 0 && event->callCount > 0; @@ -256,3 +259,28 @@ celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even } return status; } + +celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* event) { + celix_status_t status = CELIX_SUCCESS; + struct timespec start = celix_gettime(CLOCK_MONOTONIC); + struct timespec absTimeoutTime = celix_addDelayInSecondsToTime(&start, CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS); + celixThreadMutex_lock(&event->mutex); + while (event->isProcessing) { + celixThreadCondition_waitUntil(&event->cond, &event->mutex, &absTimeoutTime); + struct timespec now = celix_gettime(CLOCK_MONOTONIC); + if (celix_difftime(&start, &now) > CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS) { + status = CELIX_TIMEOUT; + break; + } + } + celixThreadMutex_unlock(&event->mutex); + if (status == CELIX_TIMEOUT) { + fw_log(event->logger, + CELIX_LOG_LEVEL_WARNING, + "Timeout while waiting for scheduled event '%s' (id=%li) for bundle id %li to finish processing", + event->eventName, + event->scheduledEventId, + event->bndId); + } + return status; +} diff --git a/libs/framework/src/celix_scheduled_event.h b/libs/framework/src/celix_scheduled_event.h index 18872e44..1334434f 100644 --- a/libs/framework/src/celix_scheduled_event.h +++ b/libs/framework/src/celix_scheduled_event.h @@ -29,6 +29,8 @@ extern "C" { typedef struct celix_scheduled_event celix_scheduled_event_t; +#define CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS 30.0 + /** * @brief Create a scheduled event for the given bundle. * @@ -116,7 +118,7 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t * @param[in] event The event to check. * @return true if the event is a one-shot event and is done. */ -bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event); +bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event); /** * @brief Configure a scheduled event for a wakeup, so celix_scheduledEvent_deadlineReached will return true until @@ -139,6 +141,16 @@ celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even size_t targetCallCount, double waitTimeInSeconds); + +/** + * @brief Wait for a scheduled event to be done with processing. + * @param[in] event The event to wait for. + * @param[in] waitTimeInSeconds The max time to wait in seconds. Must be > 0. + * @return CELIX_SUCCESS if the scheduled event is done with processing, CELIX_TIMEOUT if the scheduled event + * is not done with processing within the waitTimeInSeconds. + */ +celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* event); + #ifdef __cplusplus }; #endif diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c index 29a6c770..7e524e52 100644 --- a/libs/framework/src/framework.c +++ b/libs/framework/src/framework.c @@ -1450,7 +1450,7 @@ static double celix_framework_processScheduledEvents(celix_framework_t* fw) { if (callEvent != NULL) { celix_scheduledEvent_process(callEvent, &ts); - if (celix_scheduleEvent_isDone(callEvent)) { + if (celix_scheduleEvent_isSingleShotDone(callEvent)) { celixThreadMutex_lock(&fw->dispatcher.mutex); fw_log(fw->logger, CELIX_LOG_LEVEL_DEBUG, @@ -1502,7 +1502,8 @@ void celix_framework_cleanupScheduledEvents(celix_framework_t* fw, long bndId) { eventName, eventId, eventBndId); - } + } + celix_scheduledEvent_waitForProcessing(removedEvent); celix_scheduledEvent_release(removedEvent); } } while (removedEvent != NULL); @@ -2637,6 +2638,7 @@ bool celix_framework_removeScheduledEvent(celix_framework_t* fw, bool errorIfNot celix_scheduledEvent_getName(event), celix_scheduledEvent_getId(event), celix_scheduledEvent_getBundleId(event)); + celix_scheduledEvent_waitForProcessing(event); celix_scheduledEvent_release(event); return true; }
