This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread
in repository https://gitbox.apache.org/repos/asf/celix.git

commit e72bb907e5fca5c807dfca5bea3c35850377f21c
Author: Pepijn Noltes <[email protected]>
AuthorDate: Sun Jun 11 23:51:29 2023 +0200

    Fix mem leak in schedule event handling
---
 .../framework/gtest/src/ScheduledEventTestSuite.cc | 21 +++++--
 libs/framework/include/celix/ScheduledEvent.h      | 65 +++++++---------------
 libs/framework/include/celix_bundle_context.h      |  5 +-
 libs/framework/src/celix_scheduled_event.c         | 38 +++++++++++--
 libs/framework/src/celix_scheduled_event.h         | 14 ++++-
 libs/framework/src/framework.c                     |  6 +-
 6 files changed, 91 insertions(+), 58 deletions(-)

diff --git a/libs/framework/gtest/src/ScheduledEventTestSuite.cc 
b/libs/framework/gtest/src/ScheduledEventTestSuite.cc
index 9e53e69b..609d0ef0 100644
--- a/libs/framework/gtest/src/ScheduledEventTestSuite.cc
+++ b/libs/framework/gtest/src/ScheduledEventTestSuite.cc
@@ -501,9 +501,22 @@ TEST_F(ScheduledEventTestSuite, TimeoutOnWakeupTest) {
 
     auto status =
         
celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(),
 eventId, 0.001);
-    EXPECT_EQ(CELIX_TIMEOUT, status);
+    EXPECT_EQ(CELIX_TIMEOUT, status);;
+}
 
-    //sleep to ensure the event is processed
-    //TODO fixme, if removed the tests leaks
-    std::this_thread::sleep_for(std::chrono::milliseconds{10});
+TEST_F(ScheduledEventTestSuite, CxxCancelOneShotEventBeforeFiredTest) {
+    auto callback = []() {
+        FAIL() << "Should not be called";
+    };
+
+    //Given a one shot scheduled event with a initial delay of 1s
+    auto event = fw->getFrameworkBundleContext()->scheduledEvent()
+                     .withInitialDelay(std::chrono::seconds{1})
+                     .withCallback(callback)
+                     .build();
+
+    //When the event is cancelled before the initial delay
+    event.cancel();
+
+    //Then the event is not fired and does not leak
 }
\ No newline at end of file
diff --git a/libs/framework/include/celix/ScheduledEvent.h 
b/libs/framework/include/celix/ScheduledEvent.h
index 6358f0ca..131316a7 100644
--- a/libs/framework/include/celix/ScheduledEvent.h
+++ b/libs/framework/include/celix/ScheduledEvent.h
@@ -95,6 +95,11 @@ class ScheduledEvent final {
     }
 
   private:
+    struct Callbacks {
+        std::function<void()> callback{};              /**< The callback for 
the scheduled event. */
+        std::function<void()> removeCallback{};        /**< The remove 
callback for the scheduled event. */
+    };
+
     /**
      * @brief Constructs a scheduled event using the given bundle context and 
options.
      *
@@ -107,55 +112,27 @@ class ScheduledEvent final {
                    std::function<void()> _removeCallback,
                    celix_scheduled_event_options_t& options) {
         ctx = std::move(_cCtx);
-        options.name = _name.c_str();
-        configureCallbacks(options, std::move(_callback), 
std::move(_removeCallback));
-        eventId = celix_bundleContext_scheduleEvent(ctx.get(), &options);
-    }
-
-    /**
-     * @brief Configure the callbacks for the scheduled event, ensuring the 
callbacks outlive the scheduled event.
-     */
-    void configureCallbacks(celix_scheduled_event_options_t& options,
-                            std::function<void()> _callback,
-                            std::function<void()> _removeCallback) {
         isOneShot = options.intervalInSeconds == 0;
-        if (isOneShot) {
-            options.callbackData = new std::function<void()>{
-                std::move(_callback)}; // to ensure callback outlives the 
scheduled event object
-            options.callback = [](void* data) {
-                auto* cb = static_cast<std::function<void()>*>(data);
-                (*cb)();
-                delete cb;
-            };
-            if (_removeCallback) {
-                options.removeCallbackData = new 
std::function<void()>{std::move(_removeCallback)};
-                options.removeCallback = [](void* data) {
-                    auto* cb = static_cast<std::function<void()>*>(data);
-                    (*cb)();
-                    delete cb;
-                };
-            }
-        } else {
-            callback = std::move(_callback);
-            removeCallback = std::move(_removeCallback);
-            options.callbackData = &callback;
-            options.callback = [](void* data) {
-                auto* cb = static_cast<std::function<void()>*>(data);
-                (*cb)();
-            };
-            if (removeCallback) {
-                options.removeCallbackData = &removeCallback;
-                options.removeCallback = [](void* data) {
-                    auto* cb = static_cast<std::function<void()>*>(data);
-                    (*cb)();
-                };
+        options.name = _name.c_str();
+        auto* callbacks = new Callbacks{std::move(_callback), 
std::move(_removeCallback)};
+        options.callbackData = callbacks;
+        options.callback = [](void* data) {
+            auto* callbacks = static_cast<Callbacks*>(data);
+            (callbacks->callback)();
+        };
+        options.removeCallbackData = callbacks;
+        options.removeCallback = [](void* data) {
+            auto* callbacks = static_cast<Callbacks*>(data);
+            if (callbacks->removeCallback) {
+                (callbacks->removeCallback)();
             }
-        }
+            delete callbacks;
+        };
+
+        eventId = celix_bundleContext_scheduleEvent(ctx.get(), &options);
     }
 
     std::shared_ptr<celix_bundle_context_t> ctx{}; /**< The bundle context for 
the scheduled event. */
-    std::function<void()> callback{};              /**< The callback for the 
scheduled event. */
-    std::function<void()> removeCallback{};        /**< The remove callback 
for the scheduled event. */
     long eventId{-1};                              /**< The ID of the 
scheduled event. */
     bool isOneShot{false};                         /**< Whether the scheduled 
event is a one-shot event. */
 };
diff --git a/libs/framework/include/celix_bundle_context.h 
b/libs/framework/include/celix_bundle_context.h
index d9296ff4..b930b868 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -1289,8 +1289,9 @@ typedef struct celix_scheduled_event_options {
 /**
  * @brief Add a scheduled event to the Celix framework.
  *
- * The scheduled event wil be called repeatedly on the Celix framework event 
thread using the provided interval.
- * For the interval time a monotonic clock is used. The event callback should 
be fast and non-blocking, otherwise
+ * The scheduled event will be called on the Celix framework event thread, 
repeatedly using the provided interval or
+ * once if only a initial delay is provided.
+ * The event callback should be relatively fast and the scheduled event 
interval should be relatively high, otherwise
  * the framework event queue will be blocked and framework will not function 
properly.
  *
  * Scheduled events can be scheduled later than the provided initial delay and 
interval, because they are processed
diff --git a/libs/framework/src/celix_scheduled_event.c 
b/libs/framework/src/celix_scheduled_event.c
index 9bad56da..87ae8c7a 100644
--- a/libs/framework/src/celix_scheduled_event.c
+++ b/libs/framework/src/celix_scheduled_event.c
@@ -62,9 +62,11 @@ struct celix_scheduled_event {
                                                             callback. */
 
     celix_thread_mutex_t mutex; /**< The mutex to protect the data below. */
-    celix_thread_cond_t cond;   /**< The condition variable to signal the 
scheduled event for a changed callCount. */
+    celix_thread_cond_t cond;   /**< The condition variable to signal the 
scheduled event for a changed callCount and
+                                   isProcessing. */
     size_t useCount;            /**< The use count of the scheduled event. */
     size_t callCount;           /**< The call count of the scheduled event. */
+    bool isProcessing;          /**< Whether the scheduled event is currently 
being processed. */
     struct timespec lastScheduledEventTime; /**< The last scheduled event time 
of the scheduled event. */
     bool processForWakeup; /**< Whether the scheduled event should be 
processed directly due to a wakeupScheduledEvent
                               call. */
@@ -108,6 +110,7 @@ celix_scheduled_event_t* 
celix_scheduledEvent_create(celix_framework_logger_t* l
     event->removedCallback = removedCallback;
     event->useCount = 1;
     event->callCount = 0;
+    event->isProcessing = false;
     clock_gettime(CLOCK_MONOTONIC, &event->lastScheduledEventTime);
     event->processForWakeup = false;
 
@@ -197,7 +200,7 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* 
event, const struct t
            event->bndId);
 
     celixThreadMutex_lock(&event->mutex);
-    event->useCount += 1;
+    event->isProcessing = true;
     celixThreadMutex_unlock(&event->mutex);
     assert(event->callback != NULL);
 
@@ -205,14 +208,14 @@ void 
celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t
 
     celixThreadMutex_lock(&event->mutex);
     event->lastScheduledEventTime = *currentTime;
-    event->useCount -= 1;
+    event->isProcessing = false;
     event->callCount += 1;
     event->processForWakeup = false;
-    celixThreadCondition_broadcast(&event->cond); // broadcast for changed 
callCount
+    celixThreadCondition_broadcast(&event->cond); // broadcast for changed 
callCount and isProcessing
     celixThreadMutex_unlock(&event->mutex);
 }
 
-bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event) {
+bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event) {
     bool isDone = false;
     celixThreadMutex_lock(&event->mutex);
     isDone = event->intervalInSeconds == 0 && event->callCount > 0;
@@ -256,3 +259,28 @@ celix_status_t 
celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even
     }
     return status;
 }
+
+celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* 
event) {
+    celix_status_t status = CELIX_SUCCESS;
+    struct timespec start = celix_gettime(CLOCK_MONOTONIC);
+    struct timespec absTimeoutTime = celix_addDelayInSecondsToTime(&start, 
CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS);
+    celixThreadMutex_lock(&event->mutex);
+    while (event->isProcessing) {
+        celixThreadCondition_waitUntil(&event->cond, &event->mutex, 
&absTimeoutTime);
+        struct timespec now = celix_gettime(CLOCK_MONOTONIC);
+        if (celix_difftime(&start, &now) > 
CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS) {
+            status = CELIX_TIMEOUT;
+            break;
+        }
+    }
+    celixThreadMutex_unlock(&event->mutex);
+    if (status == CELIX_TIMEOUT) {
+        fw_log(event->logger,
+               CELIX_LOG_LEVEL_WARNING,
+               "Timeout while waiting for scheduled event '%s' (id=%li) for 
bundle id %li to finish processing",
+               event->eventName,
+               event->scheduledEventId,
+               event->bndId);
+    }
+    return status;
+}
diff --git a/libs/framework/src/celix_scheduled_event.h 
b/libs/framework/src/celix_scheduled_event.h
index 18872e44..1334434f 100644
--- a/libs/framework/src/celix_scheduled_event.h
+++ b/libs/framework/src/celix_scheduled_event.h
@@ -29,6 +29,8 @@ extern "C" {
 
 typedef struct celix_scheduled_event celix_scheduled_event_t;
 
+#define CELIX_SCHEDULED_EVENT_TIMEOUT_WAIT_FOR_PROCESSING_IN_SECONDS 30.0
+
 /**
  * @brief Create a scheduled event for the given bundle.
  *
@@ -116,7 +118,7 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* 
event, const struct t
  * @param[in] event The event to check.
  * @return  true if the event is a one-shot event and is done.
  */
-bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event);
+bool celix_scheduleEvent_isSingleShotDone(celix_scheduled_event_t* event);
 
 /**
  * @brief Configure a scheduled event for a wakeup, so 
celix_scheduledEvent_deadlineReached will return true until
@@ -139,6 +141,16 @@ celix_status_t 
celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_even
                                                             size_t 
targetCallCount,
                                                             double 
waitTimeInSeconds);
 
+
+/**
+ * @brief Wait for a scheduled event to be done with processing.
+ * @param[in] event The event to wait for.
+ * @param[in] waitTimeInSeconds The max time to wait in seconds. Must be > 0.
+ * @return CELIX_SUCCESS if the scheduled event is done with processing, 
CELIX_TIMEOUT if the scheduled event
+ *         is not done with processing within the waitTimeInSeconds.
+ */
+celix_status_t celix_scheduledEvent_waitForProcessing(celix_scheduled_event_t* 
event);
+
 #ifdef __cplusplus
 };
 #endif
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 29a6c770..7e524e52 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -1450,7 +1450,7 @@ static double 
celix_framework_processScheduledEvents(celix_framework_t* fw) {
 
         if (callEvent != NULL) {
             celix_scheduledEvent_process(callEvent, &ts);
-            if (celix_scheduleEvent_isDone(callEvent)) {
+            if (celix_scheduleEvent_isSingleShotDone(callEvent)) {
                 celixThreadMutex_lock(&fw->dispatcher.mutex);
                 fw_log(fw->logger,
                        CELIX_LOG_LEVEL_DEBUG,
@@ -1502,7 +1502,8 @@ void 
celix_framework_cleanupScheduledEvents(celix_framework_t* fw, long bndId) {
                        eventName,
                        eventId,
                        eventBndId);
-            } 
+            }
+            celix_scheduledEvent_waitForProcessing(removedEvent);
             celix_scheduledEvent_release(removedEvent);
         }
     } while (removedEvent != NULL);
@@ -2637,6 +2638,7 @@ bool 
celix_framework_removeScheduledEvent(celix_framework_t* fw, bool errorIfNot
            celix_scheduledEvent_getName(event),
            celix_scheduledEvent_getId(event),
            celix_scheduledEvent_getBundleId(event));
+    celix_scheduledEvent_waitForProcessing(event);
     celix_scheduledEvent_release(event);
     return true;
 }

Reply via email to