This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit f2fd288a6317f2f46bd66933c2b7a7451b4c6453 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jun 12 23:36:15 2023 +0200 Refactor scheduled event removal handling --- .../framework/gtest/src/ScheduledEventTestSuite.cc | 123 ++++++++++++++++- libs/framework/include/celix/ScheduledEvent.h | 30 ++++- libs/framework/include/celix_bundle_context.h | 44 ++++-- libs/framework/src/bundle_context.c | 17 ++- libs/framework/src/celix_scheduled_event.c | 97 +++++++++----- libs/framework/src/celix_scheduled_event.h | 43 +++--- libs/framework/src/framework.c | 148 +++++++++++++-------- libs/framework/src/framework_private.h | 16 ++- 8 files changed, 385 insertions(+), 133 deletions(-) diff --git a/libs/framework/gtest/src/ScheduledEventTestSuite.cc b/libs/framework/gtest/src/ScheduledEventTestSuite.cc index 609d0ef0..f4459b8c 100644 --- a/libs/framework/gtest/src/ScheduledEventTestSuite.cc +++ b/libs/framework/gtest/src/ScheduledEventTestSuite.cc @@ -26,7 +26,6 @@ class ScheduledEventTestSuite : public ::testing::Test { public: ScheduledEventTestSuite() { fw = celix::createFramework({{"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "trace"}}); } - std::shared_ptr<celix::Framework> fw{}; }; @@ -132,7 +131,7 @@ TEST_F(ScheduledEventTestSuite, ManyScheduledEventTest) { std::vector<long> eventIds{}; // When 1000 scheduled events are with a random interval between 1 and 59 ms - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < 100; ++i) { // When I create a scheduled event with a 10ms delay and a 20 ms interval celix_scheduled_event_options_t opts{}; opts.name = "Scheduled event test"; @@ -221,6 +220,11 @@ TEST_F(ScheduledEventTestSuite, InvalidOptionsAndArgumentsTest) { scheduledEventId = celix_framework_scheduleEvent( ctx->getFramework()->getCFramework(), 404, nullptr, 0.0, 0.0, nullptr, [](void*) { /*nop*/ }, nullptr, nullptr); EXPECT_EQ(scheduledEventId, -1); + + // celix_framework_waitForScheduledEvent with an invalid bndId should return CELIX_ILLEGAL_ARGUMENT + celix_status_t status = celix_framework_waitForScheduledEvent( + ctx->getFramework()->getCFramework(), 404, 1); + EXPECT_EQ(status, CELIX_ILLEGAL_ARGUMENT); } TEST_F(ScheduledEventTestSuite, WakeUpEventTest) { @@ -398,12 +402,12 @@ TEST_F(ScheduledEventTestSuite, CxxScheduledEventRAIITest) { } // When the event goes out of scope - // When the remove callback is called - EXPECT_TRUE(removed.load()); - // When waiting longer than the initial delay std::this_thread::sleep_for(std::chrono::milliseconds{60}); + // When the remove callback is called + EXPECT_TRUE(removed.load()); + // Then the count is not increased EXPECT_EQ(0, count.load()); } @@ -519,4 +523,111 @@ TEST_F(ScheduledEventTestSuite, CxxCancelOneShotEventBeforeFiredTest) { event.cancel(); //Then the event is not fired and does not leak -} \ No newline at end of file +} + +TEST_F(ScheduledEventTestSuite, RemoveScheduledEventAsync) { + std::atomic<int> count{0}; + auto callback = [](void* data) { + auto* count = static_cast<std::atomic<int>*>(data); + count->fetch_add(1); + }; + + //Given a scheduled event with am initial delay of 1ms + celix_scheduled_event_options_t opts{}; + opts.initialDelayInSeconds = 0.001; + opts.callbackData = &count; + opts.callback = callback; + long eventId = celix_bundleContext_scheduleEvent(fw->getFrameworkBundleContext()->getCBundleContext(), &opts); + EXPECT_GE(eventId, 0); + + //When the event is removed async + celix_bundleContext_removeScheduledEventAsync(fw->getFrameworkBundleContext()->getCBundleContext(), eventId); + + //And waiting longer than the initial delay + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + + //Then the event is not fired + EXPECT_EQ(0, count.load()); +} + +TEST_F(ScheduledEventTestSuite, WaitForScheduledEvent) { + std::atomic<int> count{0}; + auto callback = [](void* data) { + auto* count = static_cast<std::atomic<int>*>(data); + count->fetch_add(1); + }; + + // Given a scheduled event with an initial delay of 1ms and an interval of 1ms + celix_scheduled_event_options_t opts{}; + opts.initialDelayInSeconds = 0.001; + opts.intervalInSeconds = 0.001; + opts.callbackData = &count; + opts.callback = callback; + long eventId = celix_bundleContext_scheduleEvent(fw->getFrameworkBundleContext()->getCBundleContext(), &opts); + EXPECT_GE(eventId, 0); + + //When waiting for the event with a timeout longer than the initial delay + auto status = + celix_bundleContext_waitForScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), eventId, 1); + + //Then the return status is success + EXPECT_EQ(CELIX_SUCCESS, status); + + //And the event is fired + EXPECT_EQ(1, count.load()); + + //When waiting to short for the event + status = celix_bundleContext_waitForScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), eventId, 0.0001); + + //Then the return status is timeout + EXPECT_EQ(CELIX_TIMEOUT, status); + + //When waiting for the event with a timeout longer than the interval + status = celix_bundleContext_waitForScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), eventId, 1); + + //Then the return status is success + EXPECT_EQ(CELIX_SUCCESS, status); + + //And the event is fired again + EXPECT_EQ(2, count.load()); + + celix_bundleContext_removeScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), eventId); +} + +TEST_F(ScheduledEventTestSuite, CxxWaitForScheduledEvent) { + std::atomic<int> count{0}; + auto callback = [&count]() { + count.fetch_add(1); + }; + + // Given a scheduled event with an initial delay of 1ms and an interval of 1ms + auto event = fw->getFrameworkBundleContext()->scheduledEvent() + .withInitialDelay(std::chrono::milliseconds{1}) + .withInterval(std::chrono::milliseconds{1}) + .withCallback(callback) + .build(); + + //When waiting for the event with a timeout longer than the initial delay + auto success = event.waitFor(std::chrono::milliseconds{1000}); + + //Then the return status is success + EXPECT_TRUE(success); + + //And the event is fired + EXPECT_EQ(1, count.load()); + + //When waiting to short for the event + success = event.waitFor(std::chrono::microseconds {1}); + + //Then the return status is false (timeout) + EXPECT_FALSE(success); + + //When waiting for the event with a timeout longer than the interval + success = event.waitFor(std::chrono::milliseconds{1000}); + + //Then the return status is success + EXPECT_TRUE(success); + + //And the event is fired again + EXPECT_EQ(2, count.load()); +} diff --git a/libs/framework/include/celix/ScheduledEvent.h b/libs/framework/include/celix/ScheduledEvent.h index 131316a7..510c8a77 100644 --- a/libs/framework/include/celix/ScheduledEvent.h +++ b/libs/framework/include/celix/ScheduledEvent.h @@ -32,6 +32,7 @@ namespace celix { * This class uses RAII to automatically remove the (non one-shot) scheduled event from the bundle context * when it is destroyed. For one-shot scheduled events, the destructor will not remove the scheduled event. */ + //TODO update blocking calls with a timeout exception class ScheduledEvent final { public: friend class ScheduledEventBuilder; @@ -46,7 +47,7 @@ class ScheduledEvent final { */ ~ScheduledEvent() noexcept { if (!isOneShot) { - cancel(); + celix_bundleContext_tryRemoveScheduledEventAsync(ctx.get(), eventId); } } @@ -57,13 +58,15 @@ class ScheduledEvent final { ScheduledEvent& operator=(ScheduledEvent&&) noexcept = default; /** - * @brief Cancels the scheduled event. Can be called multiple times. When this function returns, no more scheduled - * event callbacks will be called and, if configured, the remove callback is called. + * @brief Cancels the scheduled event. * + * This method will block until a possible in-progress scheduled event callback is finished, the scheduled event + * is removed and, if configured, the remove callback is called. + * Should not be called multiple times. */ void cancel() { if (ctx) { - celix_bundleContext_tryRemoveScheduledEvent(ctx.get(), eventId); + celix_bundleContext_removeScheduledEvent(ctx.get(), eventId); } } @@ -73,6 +76,25 @@ class ScheduledEvent final { */ void wakeup() { wakeup(std::chrono::duration<double>{0}); } + /** + * @brief Wait until the next scheduled event is processed. + * + * @tparam Rep The representation type of the duration. + * @tparam Period The period type of the duration. + * @param[in] waitTime The maximum time to wait for the next scheduled event. If <= 0 the function will return + * immediately. + * @return true if the next scheduled event was processed, false if a timeout occurred. + */ + template <typename Rep, typename Period> + bool waitFor(std::chrono::duration<Rep, Period> waitTime) { + double waitTimeInSeconds = std::chrono::duration_cast<std::chrono::duration<double>>(waitTime).count(); + celix_status_t status = CELIX_SUCCESS; + if (ctx) { + status = celix_bundleContext_waitForScheduledEvent(ctx.get(), eventId, waitTimeInSeconds); + } + return status == CELIX_SUCCESS; + } + /** * @brief Wakes up the scheduled event with an optional wait time. * diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h index b930b868..4a6383be 100644 --- a/libs/framework/include/celix_bundle_context.h +++ b/libs/framework/include/celix_bundle_context.h @@ -1280,8 +1280,8 @@ typedef struct celix_scheduled_event_options { CELIX_OPTS_INIT; /**< @brief Data passed to the done callback function when a scheduled event is removed.*/ void (*removeCallback)(void* removeCallbackData) - CELIX_OPTS_INIT; /**< @brief Callback function called when a scheduled event is removed. Can called from the - event thread or another thread.*/ + CELIX_OPTS_INIT; /**< @brief Callback function called when a scheduled event is removed. Will be called on + the event thread.*/ } celix_scheduled_event_options_t; #define CELIX_EMPTY_SCHEDULED_EVENT_OPTIONS {NULL, 0.0, 0.0, NULL, NULL, NULL, NULL} @@ -1335,31 +1335,57 @@ CELIX_FRAMEWORK_EXPORT celix_status_t celix_bundleContext_wakeupScheduledEvent( long scheduledEventId, double waitTimeInSeconds); +/** + * @brief Wait until the next scheduled event is processed. + * @param[in] ctx The bundle context. + * @param[in] scheduledEventId The scheduled event id to wait for. + * @param[in] waitTimeInSeconds The maximum time to wait for the next scheduled event. If <= 0 the function will return + * immediately. + * @return CELIX_SUCCESS if the scheduled event is woken up, CELIX_ILLEGAL_ARGUMENT if the scheduled event id is not + * known and CELIX_TIMEOUT if the waitTimeInSeconds is reached. + */ +CELIX_FRAMEWORK_EXPORT celix_status_t celix_bundleContext_waitForScheduledEvent(celix_bundle_context_t* ctx, + long scheduledEventId, + double waitTimeInSeconds); + /** * @brief Cancel and remove a scheduled event. * - * When this function returns, no more scheduled event callbacks will be called amd, if configured, the remove callback - * is called. + * This function will block until a possible in-progress scheduled event callback is finished, the scheduled event + * is removed and, if configured, the remove callback is called. * * @param[in] ctx The bundle context. - * @param[in] scheduledEventId The scheduled event id to cancel. + * @param[in] scheduledEventId The scheduled event id to cancel and remove. * @return true if a scheduled event is cancelled, false if the scheduled event id is not known. */ CELIX_FRAMEWORK_EXPORT bool celix_bundleContext_removeScheduledEvent(celix_bundle_context_t* ctx, long scheduledEventId); /** - * @brief Try to cancel and remove a scheduled event. + * @brief Cancel and remove a scheduled event asynchronously. + * + * When this function returns, no new scheduled event callbacks will be called, but it is not guaranteed that there + * is still a scheduled event callback in progress and that the remove callback is called. + * + * @param[in] ctx The bundle context. + * @param[in] scheduledEventId The scheduled event id to cancel and remove. + * @return true if a scheduled event is cancelled, false if the scheduled event id is not known. + */ +CELIX_FRAMEWORK_EXPORT bool celix_bundleContext_removeScheduledEventAsync(celix_bundle_context_t* ctx, + long scheduledEventId); + +/** + * @brief Try to cancel and remove a scheduled event asynchronously. * - * When this function returns, no more scheduled event callbacks will be called amd, if configured, the remove callback - * is called. + * When this function returns, no new scheduled event callbacks will be called, but it is not guaranteed that there + * is still a scheduled event callback in progress and that the remove callback is called. * Will not log an error if the scheduled event id is not known. * * @param[in] ctx The bundle context. * @param[in] scheduledEventId The scheduled event id to cancel. * @return true if a scheduled event is cancelled, false if the scheduled event id is not known. */ -CELIX_FRAMEWORK_EXPORT bool celix_bundleContext_tryRemoveScheduledEvent(celix_bundle_context_t* ctx, +CELIX_FRAMEWORK_EXPORT bool celix_bundleContext_tryRemoveScheduledEventAsync(celix_bundle_context_t* ctx, long scheduledEventId); /** diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c index d58e3794..a0c26ea7 100644 --- a/libs/framework/src/bundle_context.c +++ b/libs/framework/src/bundle_context.c @@ -1509,13 +1509,22 @@ celix_status_t celix_bundleContext_wakeupScheduledEvent( return celix_framework_wakeupScheduledEvent(ctx->framework, scheduledEventId, waitTimeInSeconds); } +celix_status_t celix_bundleContext_waitForScheduledEvent(celix_bundle_context_t* ctx, + long scheduledEventId, + double waitTimeInSeconds) { + return celix_framework_waitForScheduledEvent(ctx->framework, scheduledEventId, waitTimeInSeconds); +} + bool celix_bundleContext_removeScheduledEvent(celix_bundle_context_t* ctx, long scheduledEventId) { - return celix_framework_removeScheduledEvent(ctx->framework, false, scheduledEventId); + return celix_framework_removeScheduledEvent(ctx->framework, false, true, scheduledEventId); +} + +bool celix_bundleContext_removeScheduledEventAsync(celix_bundle_context_t* ctx, long scheduledEventId) { + return celix_framework_removeScheduledEvent(ctx->framework, true, true, scheduledEventId); } -CELIX_FRAMEWORK_EXPORT bool celix_bundleContext_tryRemoveScheduledEvent(celix_bundle_context_t* ctx, - long scheduledEventId) { - return celix_framework_removeScheduledEvent(ctx->framework, true, scheduledEventId); +bool celix_bundleContext_tryRemoveScheduledEventAsync(celix_bundle_context_t* ctx, long scheduledEventId) { + return celix_framework_removeScheduledEvent(ctx->framework, true, false, scheduledEventId); } celix_bundle_t* celix_bundleContext_getBundle(const celix_bundle_context_t *ctx) { diff --git a/libs/framework/src/celix_scheduled_event.c b/libs/framework/src/celix_scheduled_event.c index 87ae8c7a..99d94178 100644 --- a/libs/framework/src/celix_scheduled_event.c +++ b/libs/framework/src/celix_scheduled_event.c @@ -28,6 +28,12 @@ */ #define CELIX_SCHEDULED_EVENT_INTERVAL_ALLOW_ERROR_IN_SECONDS 0.000001 +/** + * @brief The timeout in seconds, before a log message is printed while waiting for a scheduled event to be processed. + */ +#define CELIX_SCHEDULED_EVENT_ERROR_LOG_TIMEOUT_IN_SECONDS 10.0 + + /** * @brief Default name for a scheduled event. */ @@ -63,10 +69,11 @@ struct celix_scheduled_event { celix_thread_mutex_t mutex; /**< The mutex to protect the data below. */ celix_thread_cond_t cond; /**< The condition variable to signal the scheduled event for a changed callCount and - isProcessing. */ + isRemoved. */ size_t useCount; /**< The use count of the scheduled event. */ size_t callCount; /**< The call count of the scheduled event. */ - bool isProcessing; /**< Whether the scheduled event is currently being processed. */ + bool isMarkedForRemoval; /**< Whether the scheduled event is marked for removal. */ + bool isRemoved; /**< Whether the scheduled event is removed. */ struct timespec lastScheduledEventTime; /**< The last scheduled event time of the scheduled event. */ bool processForWakeup; /**< Whether the scheduled event should be processed directly due to a wakeupScheduledEvent call. */ @@ -108,9 +115,10 @@ celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* l event->callback = callback; event->removedCallbackData = removedCallbackData; event->removedCallback = removedCallback; + event->isMarkedForRemoval = false; event->useCount = 1; event->callCount = 0; - event->isProcessing = false; + event->isRemoved = false; clock_gettime(CLOCK_MONOTONIC, &event->lastScheduledEventTime); event->processForWakeup = false; @@ -150,9 +158,6 @@ void celix_scheduledEvent_release(celix_scheduled_event_t* event) { celixThreadMutex_unlock(&event->mutex); if (unused) { - if (event->removedCallback) { - event->removedCallback(event->removedCallbackData); - } celix_scheduledEvent_destroy(event); } } @@ -161,10 +166,6 @@ const char* celix_scheduledEvent_getName(const celix_scheduled_event_t* event) { long celix_scheduledEvent_getId(const celix_scheduled_event_t* event) { return event->scheduledEventId; } -double celix_scheduledEvent_getIntervalInSeconds(const celix_scheduled_event_t* event) { - return event->intervalInSeconds; -} - long celix_scheduledEvent_getBundleId(const celix_scheduled_event_t* event) { return event->bndId; } @@ -198,27 +199,22 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t "Processing scheduled event %s for bundle id %li", event->eventName, event->bndId); - - celixThreadMutex_lock(&event->mutex); - event->isProcessing = true; - celixThreadMutex_unlock(&event->mutex); assert(event->callback != NULL); event->callback(event->callbackData); // note called outside of lock celixThreadMutex_lock(&event->mutex); event->lastScheduledEventTime = *currentTime; - event->isProcessing = false; event->callCount += 1; event->processForWakeup = false; - celixThreadCondition_broadcast(&event->cond); // broadcast for changed callCount and isProcessing + celixThreadCondition_broadcast(&event->cond); //for changed callCount celixThreadMutex_unlock(&event->mutex); } -bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event) { +bool celix_scheduledEvent_isSingleShot(celix_scheduled_event_t* event) { bool isDone = false; celixThreadMutex_lock(&event->mutex); - isDone = event->intervalInSeconds == 0 && event->callCount > 0; + isDone = event->intervalInSeconds == 0; celixThreadMutex_unlock(&event->mutex); return isDone; } @@ -260,27 +256,62 @@ celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even return status; } -celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* event) { - celix_status_t status = CELIX_SUCCESS; +void celix_scheduledEvent_waitForRemoved(celix_scheduled_event_t* event) { struct timespec start = celix_gettime(CLOCK_MONOTONIC); - struct timespec absTimeoutTime = celix_addDelayInSecondsToTime(&start, CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS); + struct timespec absLogTimeout = celix_addDelayInSecondsToTime(&start, CELIX_SCHEDULED_EVENT_ERROR_LOG_TIMEOUT_IN_SECONDS); celixThreadMutex_lock(&event->mutex); - while (event->isProcessing) { - celixThreadCondition_waitUntil(&event->cond, &event->mutex, &absTimeoutTime); - struct timespec now = celix_gettime(CLOCK_MONOTONIC); - if (celix_difftime(&start, &now) > CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS) { + while (!event->isRemoved) { + celix_status_t waitStatus = celixThreadCondition_waitUntil(&event->cond, &event->mutex, &absLogTimeout); + if (waitStatus == ETIMEDOUT) { + fw_log(event->logger, + CELIX_LOG_LEVEL_WARNING, + "Timeout while waiting for removal of scheduled event '%s' (id=%li) for bundle id %li.", + event->eventName, + event->scheduledEventId, + event->bndId); + start = celix_gettime(CLOCK_MONOTONIC); + absLogTimeout = celix_addDelayInSecondsToTime(&start, CELIX_SCHEDULED_EVENT_ERROR_LOG_TIMEOUT_IN_SECONDS); + } + } + celixThreadMutex_unlock(&event->mutex); +} + +celix_status_t celix_scheduledEvent_wait(celix_scheduled_event_t* event, double timeoutInSeconds) { + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&event->mutex); + size_t targetCallCount = event->callCount + 1; + struct timespec start = celix_gettime(CLOCK_MONOTONIC); + struct timespec absTimeoutTime = celix_addDelayInSecondsToTime(&start, timeoutInSeconds); + while (event->callCount < targetCallCount) { + celix_status_t waitStatus = celixThreadCondition_waitUntil(&event->cond, &event->mutex, &absTimeoutTime); + if (waitStatus == ETIMEDOUT) { status = CELIX_TIMEOUT; break; } } celixThreadMutex_unlock(&event->mutex); - if (status == CELIX_TIMEOUT) { - fw_log(event->logger, - CELIX_LOG_LEVEL_WARNING, - "Timeout while waiting for scheduled event '%s' (id=%li) for bundle id %li to finish processing", - event->eventName, - event->scheduledEventId, - event->bndId); - } return status; } + +void celix_scheduledEvent_setRemoved(celix_scheduled_event_t* event) { + if (event->removedCallback) { + event->removedCallback(event->removedCallbackData); + } + celixThreadMutex_lock(&event->mutex); + event->isRemoved = true; + celixThreadCondition_broadcast(&event->cond); //for changed isRemoved + celixThreadMutex_unlock(&event->mutex); +} + +void celix_scheduledEvent_markForRemoval(celix_scheduled_event_t* event) { + celixThreadMutex_lock(&event->mutex); + event->isMarkedForRemoval = true; + celixThreadMutex_unlock(&event->mutex); +} + +bool celix_scheduledEvent_isMarkedForRemoval(celix_scheduled_event_t* event) { + celixThreadMutex_lock(&event->mutex); + bool isMarkedForRemoval = event->isMarkedForRemoval; + celixThreadMutex_unlock(&event->mutex); + return isMarkedForRemoval; +} diff --git a/libs/framework/src/celix_scheduled_event.h b/libs/framework/src/celix_scheduled_event.h index 1334434f..20c2cec9 100644 --- a/libs/framework/src/celix_scheduled_event.h +++ b/libs/framework/src/celix_scheduled_event.h @@ -29,8 +29,6 @@ extern "C" { typedef struct celix_scheduled_event celix_scheduled_event_t; -#define CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS 30.0 - /** * @brief Create a scheduled event for the given bundle. * @@ -80,11 +78,6 @@ const char* celix_scheduledEvent_getName(const celix_scheduled_event_t* event); */ long celix_scheduledEvent_getId(const celix_scheduled_event_t* event); -/** - * @brief Returns the interval of the scheduled event in seconds. - */ -double celix_scheduledEvent_getIntervalInSeconds(const celix_scheduled_event_t* event); - /** * @brief Returns the bundle id of the bundle which created the scheduled event. */ @@ -114,11 +107,29 @@ bool celix_scheduledEvent_deadlineReached(celix_scheduled_event_t* event, void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct timespec* currentTime); /** - * @brief Returns true if the event is a one-shot event and is done. - * @param[in] event The event to check. - * @return true if the event is a one-shot event and is done. + * @brief Call the removed callback of the event and set the removed flag. + */ +void celix_scheduledEvent_setRemoved(celix_scheduled_event_t* event); + +/** + * @brief Wait indefinitely until the event is removed. + */ +void celix_scheduledEvent_waitForRemoved(celix_scheduled_event_t* event); + +/** + * @brief Returns true if the event is a one-shot event. + */ +bool celix_scheduledEvent_isSingleShot(celix_scheduled_event_t* event); + +/** + * @brief Mark the event for removal. The event will be removed on the event thread, after the next processing. */ -bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event); +void celix_scheduledEvent_markForRemoval(celix_scheduled_event_t* event); + +/** + * @brief Returns true if the event is marked for removal. + */ +bool celix_scheduledEvent_isMarkedForRemoval(celix_scheduled_event_t* event); /** * @brief Configure a scheduled event for a wakeup, so celix_scheduledEvent_deadlineReached will return true until @@ -141,15 +152,13 @@ celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even size_t targetCallCount, double waitTimeInSeconds); - /** - * @brief Wait for a scheduled event to be done with processing. + * @brief Wait for a scheduled event to be done with the next scheduled processing. * @param[in] event The event to wait for. - * @param[in] waitTimeInSeconds The max time to wait in seconds. Must be > 0. - * @return CELIX_SUCCESS if the scheduled event is done with processing, CELIX_TIMEOUT if the scheduled event - * is not done with processing within the waitTimeInSeconds. + * @param[in] timeoutInSeconds The max time to wait in seconds. Must be > 0. + * @return CELIX_SUCCESS if the scheduled event is done with processing, CELIX_TIMEOUT if the scheduled event. */ -celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* event); +celix_status_t celix_scheduledEvent_wait(celix_scheduled_event_t* event, double timeoutInSeconds); #ifdef __cplusplus }; diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c index 7e524e52..8232ed04 100644 --- a/libs/framework/src/framework.c +++ b/libs/framework/src/framework.c @@ -1426,23 +1426,36 @@ static inline void fw_handleEvents(celix_framework_t* framework) { * @brief Process all scheduled events. */ static double celix_framework_processScheduledEvents(celix_framework_t* fw) { - double nextScheduledEvent = 0.0; struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); + double nextClosestScheduledEvent; celix_scheduled_event_t* callEvent; + celix_scheduled_event_t* removeEvent; do { + nextClosestScheduledEvent = -1; //negative means no event next event callEvent = NULL; + removeEvent = NULL; double nextEvent; celixThreadMutex_lock(&fw->dispatcher.mutex); CELIX_LONG_HASH_MAP_ITERATE(fw->dispatcher.scheduledEvents, entry) { celix_scheduled_event_t* visit = entry.value.ptrValue; + if (celix_scheduledEvent_isMarkedForRemoval(visit)) { + removeEvent = visit; + celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(visit)); + break; + } + bool call = celix_scheduledEvent_deadlineReached(visit, &ts, &nextEvent); - if (nextScheduledEvent == 0.0 || nextEvent < nextScheduledEvent) { - nextScheduledEvent = nextEvent; + if (nextClosestScheduledEvent < 0 || nextEvent < nextClosestScheduledEvent) { + nextClosestScheduledEvent = nextEvent; } if (call) { callEvent = visit; + if (celix_scheduledEvent_isSingleShot(visit)) { + removeEvent = visit; + celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(visit)); + } break; } } @@ -1450,63 +1463,56 @@ static double celix_framework_processScheduledEvents(celix_framework_t* fw) { if (callEvent != NULL) { celix_scheduledEvent_process(callEvent, &ts); - if (celix_scheduleEvent_isSingleShotDone(callEvent)) { - celixThreadMutex_lock(&fw->dispatcher.mutex); - fw_log(fw->logger, - CELIX_LOG_LEVEL_DEBUG, - "Removing processed one-shot scheduled event '%s' (id=%li) for bundle if %li.", - celix_scheduledEvent_getName(callEvent), - celix_scheduledEvent_getId(callEvent), - celix_scheduledEvent_getBundleId(callEvent)); - celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(callEvent)); - celix_scheduledEvent_release(callEvent); - celixThreadMutex_unlock(&fw->dispatcher.mutex); - } } - } while (callEvent != NULL); + if (removeEvent != NULL) { + const char* formatStr = celix_scheduledEvent_isSingleShot(removeEvent) ? + "Removing processed one-shot scheduled event '%s' (id=%li) for bundle if %li.": + "Removing processed scheduled event '%s' (id=%li) for bundle if %li."; + fw_log(fw->logger, + CELIX_LOG_LEVEL_DEBUG, + formatStr, + celix_scheduledEvent_getName(removeEvent), + celix_scheduledEvent_getId(removeEvent), + celix_scheduledEvent_getBundleId(removeEvent)); + celix_scheduledEvent_setRemoved(removeEvent); + celix_scheduledEvent_release(removeEvent); + } + } while (callEvent != NULL && removeEvent != NULL); - return nextScheduledEvent; + return nextClosestScheduledEvent; } void celix_framework_cleanupScheduledEvents(celix_framework_t* fw, long bndId) { - celix_scheduled_event_t* removedEvent; + celix_scheduled_event_t* removeEvent; do { - removedEvent = NULL; + removeEvent = NULL; celixThreadMutex_lock(&fw->dispatcher.mutex); CELIX_LONG_HASH_MAP_ITERATE(fw->dispatcher.scheduledEvents, entry) { celix_scheduled_event_t* visit = entry.value.ptrValue; if (bndId == celix_scheduledEvent_getBundleId(visit)) { - celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(visit)); - removedEvent = visit; + removeEvent = visit; + celix_scheduledEvent_retain(removeEvent); + if (!celix_scheduledEvent_isSingleShot(removeEvent)) { + fw_log(fw->logger, + CELIX_LOG_LEVEL_WARNING, + "Removing dangling scheduled event '%s' (id=%li) for bundle id %li. This scheduled event should " + "have been removed up by the bundle.", + celix_scheduledEvent_getName(removeEvent), + celix_scheduledEvent_getId(removeEvent), + celix_scheduledEvent_getBundleId(removeEvent)); + } + celix_scheduledEvent_markForRemoval(removeEvent); + celixThreadCondition_broadcast(&fw->dispatcher.cond); //notify that scheduled event is marked for removal break; } } celixThreadMutex_unlock(&fw->dispatcher.mutex); - if (removedEvent) { - const char* eventName = celix_scheduledEvent_getName(removedEvent); - double interval = celix_scheduledEvent_getIntervalInSeconds(removedEvent); - long eventId = celix_scheduledEvent_getId(removedEvent); - long eventBndId = celix_scheduledEvent_getBundleId(removedEvent); - if (interval > 0) { - fw_log(fw->logger, - CELIX_LOG_LEVEL_WARNING, - "Removing dangling scheduled event '%s' (id=%li) for bundle id %li. This should have been cleaned up by the bundle.", - eventName, - eventId, - eventBndId); - } else { - fw_log(fw->logger, - CELIX_LOG_LEVEL_DEBUG, - "Removing unprocessed one-shot scheduled event '%s' (id=%li) for bundle id %li.", - eventName, - eventId, - eventBndId); - } - celix_scheduledEvent_waitForProcessing(removedEvent); - celix_scheduledEvent_release(removedEvent); + if (removeEvent) { + celix_scheduledEvent_waitForRemoved(removeEvent); + celix_scheduledEvent_release(removeEvent); } - } while (removedEvent != NULL); + } while (removeEvent != NULL); } static void celix_framework_waitForNextEvent(celix_framework_t* fw, double nextScheduledEvent) { @@ -2581,10 +2587,12 @@ long celix_framework_scheduleEvent(celix_framework_t* fw, celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, long scheduledEventId, double waitTimeInSeconds) { + size_t callCountAfterWakeup; celixThreadMutex_lock(&fw->dispatcher.mutex); celix_scheduled_event_t* event = celix_longHashMap_get(fw->dispatcher.scheduledEvents, scheduledEventId); if (event != NULL) { celix_scheduledEvent_retain(event); + callCountAfterWakeup = celix_scheduledEvent_configureWakeup(event); } celixThreadMutex_unlock(&fw->dispatcher.mutex); @@ -2596,8 +2604,6 @@ celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, return CELIX_ILLEGAL_ARGUMENT; } - - size_t callCountAfterWakeup = celix_scheduledEvent_configureWakeup(event); celixThreadMutex_lock(&fw->dispatcher.mutex); celixThreadCondition_broadcast(&fw->dispatcher.cond); //notify dispatcher thread for configured wakeup celixThreadMutex_unlock(&fw->dispatcher.mutex); @@ -2615,30 +2621,54 @@ celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, return status; } -bool celix_framework_removeScheduledEvent(celix_framework_t* fw, bool errorIfNotFound, long scheduledEventId) { +celix_status_t +celix_framework_waitForScheduledEvent(celix_framework_t* fw, long scheduledEventId, double waitTimeInSeconds) { + celixThreadMutex_lock(&fw->dispatcher.mutex); + celix_scheduled_event_t* event = celix_longHashMap_get(fw->dispatcher.scheduledEvents, scheduledEventId); + if (event != NULL) { + celix_scheduledEvent_retain(event); + } + celixThreadMutex_unlock(&fw->dispatcher.mutex); + + if (event == NULL) { + fw_log(fw->logger, + CELIX_LOG_LEVEL_WARNING, + "Cannot wait for scheduled event. Unknown scheduled event id %li.", + scheduledEventId); + return CELIX_ILLEGAL_ARGUMENT; + } + + celix_status_t status = celix_scheduledEvent_wait(event, waitTimeInSeconds); + celix_scheduledEvent_release(event); + return status; +} + +bool celix_framework_removeScheduledEvent(celix_framework_t* fw, + bool async, + bool errorIfNotFound, + long scheduledEventId) { if (scheduledEventId < 0) { - return false; //silently ignore + return false; // silently ignore } celixThreadMutex_lock(&fw->dispatcher.mutex); celix_scheduled_event_t* event = celix_longHashMap_get(fw->dispatcher.scheduledEvents, scheduledEventId); - celix_longHashMap_remove(fw->dispatcher.scheduledEvents, scheduledEventId); + if (event) { + celix_scheduledEvent_retain(event); + celix_scheduledEvent_markForRemoval(event); + celixThreadCondition_broadcast(&fw->dispatcher.cond); //notify dispatcher thread for removed scheduled event + } celixThreadMutex_unlock(&fw->dispatcher.mutex); - if (event == NULL) { + if (!event) { celix_log_level_e level = errorIfNotFound ? CELIX_LOG_LEVEL_TRACE : CELIX_LOG_LEVEL_ERROR; - fw_log(fw->logger, level, "Cannot remove scheduled event with id %li. Not found.", - scheduledEventId); + fw_log(fw->logger, level, "Cannot remove scheduled event with id %li. Not found.", scheduledEventId); return false; } - fw_log(fw->logger, - CELIX_LOG_LEVEL_DEBUG, - "Removing scheduled event '%s' (id=%li) for bundle id %li.", - celix_scheduledEvent_getName(event), - celix_scheduledEvent_getId(event), - celix_scheduledEvent_getBundleId(event)); - celix_scheduledEvent_waitForProcessing(event); + if (!async) { + celix_scheduledEvent_waitForRemoved(event); + } celix_scheduledEvent_release(event); return true; } diff --git a/libs/framework/src/framework_private.h b/libs/framework/src/framework_private.h index 45e2e41a..e9e5173b 100644 --- a/libs/framework/src/framework_private.h +++ b/libs/framework/src/framework_private.h @@ -490,17 +490,31 @@ celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, long scheduledEventId, double waitTimeInSeconds); +/** + * @brief Wait for the next scheduled event to be processed. + * @param[in] fw The Celix framework + * @param[in] scheduledEventId The scheduled event id to wait for. + * @param[in] waitTimeInSeconds The maximum time to wait for the next scheduled event. If <= 0 the function will return + * immediately. + * @return CELIX_SUCCESS if the scheduled event is woken up, CELIX_ILLEGAL_ARGUMENT if the scheduled event id is not + * known and CELIX_TIMEOUT if the waitTimeInSeconds is reached. + */ +celix_status_t celix_framework_waitForScheduledEvent(celix_framework_t* fw, + long scheduledEventId, + double waitTimeInSeconds); + /** * @brief Cancel a scheduled event. * * When this function returns, no more scheduled event callbacks will be called. * * @param[in] fw The Celix framework + * @param[in] async If true, the scheduled event will be cancelled asynchronously and the function will not block. * @param[in] errorIfNotFound If true, removal of a non existing scheduled event id will not be logged. * @param[in] scheduledEventId The scheduled event id to cancel. * @return true if a scheduled event is cancelled, false if the scheduled event id is not known. */ -bool celix_framework_removeScheduledEvent(celix_framework_t* fw, bool errorIfNotFound, long scheduledEventId); +bool celix_framework_removeScheduledEvent(celix_framework_t* fw, bool async, bool errorIfNotFound, long scheduledEventId); /** * Remove all scheduled events for the provided bundle id and logs warning if there are still un-removed scheduled
