This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/scheduled_event_on_event_thread in repository https://gitbox.apache.org/repos/asf/celix.git
commit 2e9073874a14ea0a70194e72f7bf20c5bfce786d Author: Pepijn Noltes <[email protected]> AuthorDate: Tue May 16 07:46:00 2023 +0200 Refactor scheduled event to support wakeups --- .clang-format | 27 ++++ .../log_admin/gtest/src/LogAdminTestSuite.cc | 24 +-- libs/framework/gtest/CMakeLists.txt | 1 - .../framework/gtest/src/ScheduledEventTestSuite.cc | 94 +++++++++++- .../ScheduledEventWithErrorInjectionTestSuite.cc | 14 -- libs/framework/include/celix/ScheduledEvent.h | 20 +++ libs/framework/include/celix_bundle_context.h | 32 +++- libs/framework/src/bundle_context.c | 7 + libs/framework/src/celix_scheduled_event.c | 168 ++++++++++++++++++--- libs/framework/src/celix_scheduled_event.h | 79 ++++++---- libs/framework/src/framework.c | 143 +++++++++++------- libs/framework/src/framework_private.h | 18 ++- libs/utils/gtest/src/ThreadsTestSuite.cc | 59 ++++++++ libs/utils/gtest/src/TimeUtilsTestSuite.cc | 50 +++++- libs/utils/include/celix_threads.h | 37 +++++ libs/utils/include/celix_utils.h | 8 + libs/utils/src/celix_threads.c | 20 +++ libs/utils/src/utils.c | 24 +++ 18 files changed, 679 insertions(+), 146 deletions(-) diff --git a/.clang-format b/.clang-format new file mode 100644 index 00000000..6f4dbba6 --- /dev/null +++ b/.clang-format @@ -0,0 +1,27 @@ +BasedOnStyle: LLVM +IndentWidth: 4 +AllowShortIfStatementsOnASingleLine: false +AlwaysBreakTemplateDeclarations: Yes +BinPackArguments: false +BinPackParameters: false +BreakBeforeBraces: Attach +ColumnLimit: 120 +ConstructorInitializerIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +IncludeBlocks: Regroup +KeepEmptyLinesAtTheStartOfBlocks: false +NamespaceIndentation: None +PointerAlignment: Left +ReflowComments: true +SortIncludes: false +SpaceAfterCStyleCast: false +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: Never +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +AlignEscapedNewlines: Right diff --git a/bundles/logging/log_admin/gtest/src/LogAdminTestSuite.cc b/bundles/logging/log_admin/gtest/src/LogAdminTestSuite.cc index b0a7220a..95b75a81 100644 --- a/bundles/logging/log_admin/gtest/src/LogAdminTestSuite.cc +++ b/bundles/logging/log_admin/gtest/src/LogAdminTestSuite.cc @@ -83,11 +83,11 @@ TEST_F(LogBundleTestSuite, NrOfLogServices) { EXPECT_EQ(1, control->nrOfLogServices(control->handle, nullptr)); //default the framework log services is available //request "default" log service - long trkId1 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, NULL, NULL); + long trkId1 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, nullptr, nullptr); EXPECT_EQ(2, control->nrOfLogServices(control->handle, nullptr)); //request "default" log service -> already created - long trkId2 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, NULL, NULL); + long trkId2 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, nullptr, nullptr); EXPECT_EQ(2, control->nrOfLogServices(control->handle, nullptr)); //request a 'logger1' log service @@ -224,7 +224,7 @@ TEST_F(LogBundleTestSuite, SinkLogControl) { TEST_F(LogBundleTestSuite, LogServiceControl) { //request "default" log service - long trkId1 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, NULL, NULL); + long trkId1 = celix_bundleContext_trackService(ctx.get(), CELIX_LOG_SERVICE_NAME, nullptr, nullptr); celix_framework_waitForEmptyEventQueue(fw.get()); EXPECT_EQ(2, control->nrOfLogServices(control->handle, nullptr)); @@ -328,7 +328,7 @@ TEST_F(LogBundleTestSuite, LogServiceAndSink) { //request a 'logger1' log service long trkId; - std::atomic<celix_log_service_t*> logSvc; + std::atomic<celix_log_service_t*> logSvc{}; { celix_service_tracking_options_t opts{}; opts.filter.serviceName = CELIX_LOG_SERVICE_NAME; @@ -342,6 +342,10 @@ TEST_F(LogBundleTestSuite, LogServiceAndSink) { } celix_framework_waitForEmptyEventQueue(fw.get()); + //TODO fixme, apparently this is needed, because the celix_framework_waitForEmptyEventQueue for does not work + //this is probably due to the moved broadcast in the framework event handle, check this + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ASSERT_TRUE(logSvc.load() != nullptr); auto initial = count.load(); celix_log_service_t *ls = logSvc.load(); @@ -471,7 +475,7 @@ TEST_F(LogBundleTestSuite, LogAdminCmd) { opts.filter.serviceName = CELIX_SHELL_COMMAND_SERVICE_NAME; opts.use = [](void*, void *svc) { auto* cmd = static_cast<celix_shell_command_t*>(svc); - char *cmdResult = NULL; + char *cmdResult = nullptr; size_t cmdResultLen; FILE *ss = open_memstream(&cmdResult, &cmdResultLen); cmd->executeCommand(cmd->handle, "celix::log_admin", ss, ss); //overview @@ -487,7 +491,7 @@ TEST_F(LogBundleTestSuite, LogAdminCmd) { opts.use = [](void*, void *svc) { auto* cmd = static_cast<celix_shell_command_t*>(svc); - char *cmdResult = NULL; + char *cmdResult = nullptr; size_t cmdResultLen; FILE *ss = open_memstream(&cmdResult, &cmdResultLen); cmd->executeCommand(cmd->handle, "celix::log_admin log fatal", ss, ss); //all @@ -504,7 +508,7 @@ TEST_F(LogBundleTestSuite, LogAdminCmd) { opts.use = [](void*, void *svc) { auto* cmd = static_cast<celix_shell_command_t*>(svc); - char *cmdResult = NULL; + char *cmdResult = nullptr; size_t cmdResultLen; FILE *ss = open_memstream(&cmdResult, &cmdResultLen); cmd->executeCommand(cmd->handle, "celix::log_admin sink false", ss, ss); //all @@ -523,9 +527,9 @@ TEST_F(LogBundleTestSuite, LogAdminCmd) { opts.use = [](void*, void *svc) { auto* cmd = static_cast<celix_shell_command_t*>(svc); - char *cmdResult = NULL; + char *cmdResult = nullptr; size_t cmdResultLen; - char *errResult = NULL; + char *errResult = nullptr; size_t errResultLen; FILE *ss = open_memstream(&cmdResult, &cmdResultLen); FILE *es = open_memstream(&errResult, &errResultLen); @@ -586,7 +590,7 @@ TEST_F(LogBundleTestSuite, LogAdminCmd) { opts.use = [](void*, void *svc) { auto* cmd = static_cast<celix_shell_command_t*>(svc); - char *cmdResult = NULL; + char *cmdResult = nullptr; size_t cmdResultLen; FILE *ss = open_memstream(&cmdResult, &cmdResultLen); cmd->executeCommand(cmd->handle, "celix::log_admin", ss, ss); diff --git a/libs/framework/gtest/CMakeLists.txt b/libs/framework/gtest/CMakeLists.txt index f2e1faa7..41eb9ba1 100644 --- a/libs/framework/gtest/CMakeLists.txt +++ b/libs/framework/gtest/CMakeLists.txt @@ -144,7 +144,6 @@ if (LINKER_WRAP_SUPPORTED) Celix::utils_ei Celix::asprintf_ei Celix::dlfcn_ei - Celix::array_list_ei GTest::gtest GTest::gtest_main ) diff --git a/libs/framework/gtest/src/ScheduledEventTestSuite.cc b/libs/framework/gtest/src/ScheduledEventTestSuite.cc index 160aab6c..295f8c27 100644 --- a/libs/framework/gtest/src/ScheduledEventTestSuite.cc +++ b/libs/framework/gtest/src/ScheduledEventTestSuite.cc @@ -26,7 +26,7 @@ class ScheduledEventTestSuite : public ::testing::Test { public: ScheduledEventTestSuite() { - fw = celix::createFramework({{"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"}}); + fw = celix::createFramework({{"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "trace"}}); } std::shared_ptr<celix::Framework> fw{}; @@ -180,9 +180,9 @@ TEST_F(ScheduledEventTestSuite, InvalidOptionsAndArgumentsTest) { //Then I expect an error EXPECT_LT(scheduledEventId, 0); - //celix_scheduleEvent_destroy and celix_scheduledEvent_waitAndDestroy can be called with NULL - celix_scheduledEvent_destroy(nullptr); - celix_scheduledEvent_waitAndDestroy(nullptr); + //celix_scheduleEvent_release and celix_scheduledEvent_retain can be called with NULL + celix_scheduledEvent_release(nullptr); + celix_scheduledEvent_retain(nullptr); //celix_bundleContext_removeScheduledEvent can handle invalid eventIds celix_bundleContext_removeScheduledEvent(ctx->getCBundleContext(), -1); @@ -193,3 +193,89 @@ TEST_F(ScheduledEventTestSuite, InvalidOptionsAndArgumentsTest) { ctx->getFramework()->getCFramework(), 404, nullptr, 0.0, 0.0, nullptr, [](void*) { /*nop*/ }); EXPECT_EQ(scheduledEventId, -1); } + +TEST_F(ScheduledEventTestSuite, WakeUpEventTest) { + // Given a counter scheduled event with a long initial delay is added + std::atomic<int> count{0}; + celix_scheduled_event_options_t opts{}; + opts.eventName = "test wakeup"; + opts.initialDelayInSeconds = 0.1; + opts.intervalInSeconds = 0.1; + opts.eventData = static_cast<void*>(&count); + opts.eventCallback = [](void* countPtr) { + auto* count = static_cast<std::atomic<int>*>(countPtr); + count->fetch_add(1); + }; + long scheduledEventId = celix_bundleContext_addScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), &opts); + ASSERT_NE(-1L, scheduledEventId); + EXPECT_EQ(0, count.load()); + + // When the scheduled event is woken up + celix_status_t status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId, 1); + + // Then the status is CELIX_SUCCESS + ASSERT_EQ(CELIX_SUCCESS, status); + + // And the count is increased + EXPECT_EQ(1, count.load()); + + // When waiting longer than the interval + std::this_thread::sleep_for(std::chrono::milliseconds{110}); + + // Then the count is increased + EXPECT_EQ(2, count.load()); + + // When the scheduled event is woken up again without waiting (waitTimeInSec = 0) + status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId, 0); + + // And the process is delayed to ensure the event is called + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + + // Then the status is CELIX_SUCCESS + ASSERT_EQ(CELIX_SUCCESS, status); + + // And the count is increased + EXPECT_EQ(3, count.load()); + + // When the scheduled event is woken up again + status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId, 1); + + // Then the status is CELIX_SUCCESS + ASSERT_EQ(CELIX_SUCCESS, status); + + // And the count is increased + EXPECT_EQ(4, count.load()); + + celix_bundleContext_removeScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId); +} + +TEST_F(ScheduledEventTestSuite, WakeUpOneShotEventTest) { + // Given a counter scheduled event with a long initial delay is added + std::atomic<int> count{0}; + celix_scheduled_event_options_t opts{}; + opts.eventName = "test one-shot wakeup"; + opts.initialDelayInSeconds = 5; + opts.eventData = static_cast<void*>(&count); + opts.eventCallback = [](void* countPtr) { + auto* count = static_cast<std::atomic<int>*>(countPtr); + count->fetch_add(1); + }; + long scheduledEventId = celix_bundleContext_addScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), &opts); + ASSERT_NE(-1L, scheduledEventId); + EXPECT_EQ(0, count.load()); + + // When the scheduled event is woken up + celix_status_t status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId, 1); + + // Then the status is CELIX_SUCCESS + ASSERT_EQ(CELIX_SUCCESS, status); + + // And the count is increased + EXPECT_EQ(1, count.load()); + + // And when the scheduled event is woken up again + status = celix_bundleContext_wakeupScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), scheduledEventId, 0); + + // Then the status is ILLEGAL_ARGUMENT, becuase the scheduled event is already woken up and a one-shot event + ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status); +} diff --git a/libs/framework/gtest/src/ScheduledEventWithErrorInjectionTestSuite.cc b/libs/framework/gtest/src/ScheduledEventWithErrorInjectionTestSuite.cc index d3bb2ef2..cf56bd3f 100644 --- a/libs/framework/gtest/src/ScheduledEventWithErrorInjectionTestSuite.cc +++ b/libs/framework/gtest/src/ScheduledEventWithErrorInjectionTestSuite.cc @@ -24,7 +24,6 @@ #include "celix_scheduled_event.h" #include "framework_private.h" -#include "celix_array_list_ei.h" #include "malloc_ei.h" class ScheduledEventWithErrorInjectionTestSuite : public ::testing::Test { @@ -34,25 +33,12 @@ public: } ~ScheduledEventWithErrorInjectionTestSuite() noexcept override { - celix_ei_expect_celix_arrayList_add(nullptr, 0, CELIX_SUCCESS); celix_ei_expect_malloc(nullptr, 0, nullptr); } std::shared_ptr<celix::Framework> fw{}; }; -TEST_F(ScheduledEventWithErrorInjectionTestSuite, ArrayListAddFailsTest) { - //Given celix_arrayList_add is primed to fail on the first call from addScheduledEvent (whitebox knowledge) - celix_ei_expect_celix_arrayList_add((void*)celix_bundleContext_addScheduledEvent, 1, CELIX_ENOMEM); - - //When a scheduled event is added - celix_scheduled_event_options_t opts{}; - opts.eventCallback = [](void*){/*nop*/}; - long scheduledEventId = celix_bundleContext_addScheduledEvent(fw->getFrameworkBundleContext()->getCBundleContext(), &opts); - - //Then the scheduled event id is -1 (error) - EXPECT_EQ(-1L, scheduledEventId); -} TEST_F(ScheduledEventWithErrorInjectionTestSuite, MallocFailsTest) { //Given malloc is primed to fail on the first call from celix_scheduledEvent_create (whitebox knowledge) diff --git a/libs/framework/include/celix/ScheduledEvent.h b/libs/framework/include/celix/ScheduledEvent.h new file mode 100644 index 00000000..c1aa1df0 --- /dev/null +++ b/libs/framework/include/celix/ScheduledEvent.h @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h index c0e15bf0..cbd644d6 100644 --- a/libs/framework/include/celix_bundle_context.h +++ b/libs/framework/include/celix_bundle_context.h @@ -1268,7 +1268,7 @@ typedef struct celix_scheduled_event_options { double initialDelayInSeconds CELIX_OPTS_INIT; /**< @brief Initial delay in seconds before the event is processed.*/ double intervalInSeconds CELIX_OPTS_INIT; /**< @brief Schedule interval in seconds. - * 0 means one shot scheduled event. + * 0 means one-shot scheduled event. */ void* eventData CELIX_OPTS_INIT; /**< @brief Data passed to the eventCallback function when a event is scheduled.*/ @@ -1287,15 +1287,15 @@ typedef struct celix_scheduled_event_options { * after other events in the Celix event thread. * The target - but not guaranteed - precision of the scheduled event trigger is 1 microsecond. * - * If the provided interval is 0, the scheduled event will a one short scheduled event and will be called once - * after the provided initial delay. If a bundle stops before the one short scheduled event is called, the scheduled + * If the provided interval is 0, the scheduled event will a one-shot scheduled event and will be called once + * after the provided initial delay. If a bundle stops before the one-shot scheduled event is called, the scheduled * event will be removed and not called. * - * Scheduled events should be removed by the caller when not needed anymore. - * Exception are the one shot scheduled events, these are automatically removed after the event callback is called. + * Scheduled events should be removed by the caller when not needed anymore, except for one-shot scheduled events. + * one-shot are automatically removed after the event callback is called. * - * Note during bundle stop the framework will check if all scheduled events for the bundle are removed. - * For every not removed scheduled event, which is not a one short event, a warning will be logged and the + * Note during bundle stop the framework will check if all scheduled events for the bundle are removed. + * For every not removed scheduled event that is not a one-shot event, a warning will be logged and the * scheduled event will be removed. * * @param[in] ctx The bundle context. @@ -1306,6 +1306,24 @@ typedef struct celix_scheduled_event_options { CELIX_FRAMEWORK_EXPORT long celix_bundleContext_addScheduledEvent(celix_bundle_context_t* ctx, const celix_scheduled_event_options_t* options); +/** + * @brief Wakeup a scheduled event. + * + * If waitTimeInSeconds is not 0, this function will block until the scheduled event callback is called. + * If waitTimeInSeconds is 0, this function will return immediately. + * + * @param[in] ctx The bundle context. + * @param[in] scheduledEventId The scheduled event id to wakeup. + * @param[in] waitTimeInSeconds If not 0, this function will block until the scheduled event callback + * is called or the provided timeout is reached. + * @return CELIX_SUCCESS if the scheduled event is woken up, CELIX_ILLEGAL_ARGUMENT if the scheduled event id is not + * known and CELIX_TIMEOUT if the waitTimeInSeconds is reached. + */ +CELIX_FRAMEWORK_EXPORT celix_status_t celix_bundleContext_wakeupScheduledEvent( + celix_bundle_context_t* ctx, + long scheduledEventId, + double waitTimeInSeconds); + /** * @brief Cancel and remove a scheduled event. * diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c index f60ea47b..01c44143 100644 --- a/libs/framework/src/bundle_context.c +++ b/libs/framework/src/bundle_context.c @@ -1494,6 +1494,13 @@ long celix_bundleContext_addScheduledEvent(celix_bundle_context_t* ctx, options->eventCallback); } +celix_status_t celix_bundleContext_wakeupScheduledEvent( + celix_bundle_context_t* ctx, + long scheduledEventId, + double waitTimeInSeconds) { + return celix_framework_wakeupScheduledEvent(ctx->framework, scheduledEventId, waitTimeInSeconds); +} + bool celix_bundleContext_removeScheduledEvent(celix_bundle_context_t* ctx, long scheduledEventId) { return celix_framework_removeScheduledEvent(ctx->framework, scheduledEventId); } diff --git a/libs/framework/src/celix_scheduled_event.c b/libs/framework/src/celix_scheduled_event.c index 4c7c687e..6ef8533a 100644 --- a/libs/framework/src/celix_scheduled_event.c +++ b/libs/framework/src/celix_scheduled_event.c @@ -22,6 +22,54 @@ #include "celix_scheduled_event.h" #include "celix_utils.h" +/*! + * @brief Allow error in seconds for the interval. This ensure pthread cond wakeups result in a call even if + * the exact wakeupt time is a bit off. + */ +#define CELIX_SCHEDULED_EVENT_INTERVAL_ALLOW_ERROR_IN_SECONDS 0.000001 + +/** + * @brief Default name for a scheduled event. + */ +static const char* const CELIX_SCHEDULED_EVENT_DEFAULT_NAME = "unnamed"; + +/** + * @brief Struct representing a scheduled event. + * + * A scheduled event is an event that is scheduled to be executed at a certain ititial delay and/or interval. + * It is created using the `celix_bundleContext_addScheduledEvent` function and can be woken up + * using the `celix_bundleContext_wakeupScheduledEvent` function. + * + * The struct contains information about the scheduled event, such as the event name, initial delay, + * interval, and callback function. It also contains synchronization primitives to protect the use + * count and call count of the event. + * + * @see celix_bundleContext_addScheduledEvent + * @see celix_bundleContext_wakeupScheduledEvent + */ +struct celix_scheduled_event { + long scheduledEventId; /**< The ID of the scheduled event. */ + celix_framework_logger_t* logger; /**< The framework logger used to log information */ + celix_framework_bundle_entry_t* + bndEntry; /**< The bundle entry for the scheduled event. Note the scheduled event keeps a use count on the + bundle entry and decreased this during the destruction of the scheduled event. */ + + char* eventName; /**< The name of the scheduled event. Will be CELIX_SCHEDULED_EVENT_DEFAULT_NAME if no name is + provided during creation. */ + double initialDelayInSeconds; /**< The initial delay of the scheduled event in seconds. */ + double intervalInSeconds; /**< The interval of the scheduled event in seconds. */ + void* eventCallbackData; /**< The data for the scheduled event callback. */ + void (*eventCallback)(void* eventData); /**< The callback function for the scheduled event. */ + + celix_thread_mutex_t mutex; /**< The mutex to protect the data below. */ + celix_thread_cond_t cond; /**< The condition variable to signal the scheduled event for a changed callCount. */ + size_t useCount; /**< The use count of the scheduled event. */ + size_t callCount; /**< The call count of the scheduled event. */ + struct timespec lastScheduledEventTime; /**< The last scheduled event time of the scheduled event. */ + bool processForWakeup; /**< Whether the scheduled event should be processed directly due to a wakeupScheduledEvent + call. */ +}; + celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* logger, celix_framework_bundle_entry_t* bndEntry, long scheduledEventId, @@ -31,10 +79,13 @@ celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* l void* eventData, void (*eventCallback)(void* eventData)) { celix_scheduled_event_t* event = malloc(sizeof(*event)); - char* eventName = providedEventName == NULL ? (char*)CELIX_SCHEDULED_EVENT_DEFAULT_NAME - : celix_utils_strdup(providedEventName); + char* eventName = + providedEventName == NULL ? (char*)CELIX_SCHEDULED_EVENT_DEFAULT_NAME : celix_utils_strdup(providedEventName); if (event == NULL || eventName == NULL) { - fw_log(logger, CELIX_LOG_LEVEL_ERROR, "Cannot add scheduled event for bundle id %li. Out of memory", bndEntry->bndId); + fw_log(logger, + CELIX_LOG_LEVEL_ERROR, + "Cannot add scheduled event for bundle id %li. Out of memory", + bndEntry->bndId); free(event); if (eventName != CELIX_SCHEDULED_EVENT_DEFAULT_NAME) { free(eventName); @@ -52,40 +103,66 @@ celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* l event->intervalInSeconds = intervalInSeconds; event->eventCallbackData = eventData; event->eventCallback = eventCallback; - event->useCount = 0; + event->useCount = 1; event->callCount = 0; clock_gettime(CLOCK_MONOTONIC, &event->lastScheduledEventTime); + event->processForWakeup = false; + celixThreadMutex_create(&event->mutex, NULL); celixThreadCondition_init(&event->cond, NULL); return event; } -void celix_scheduledEvent_destroy(celix_scheduled_event_t* event) { - if (event != NULL) { - celix_framework_bundleEntry_decreaseUseCount(event->bndEntry); - celixThreadMutex_destroy(&event->mutex); - celixThreadCondition_destroy(&event->cond); - if (event->eventName != CELIX_SCHEDULED_EVENT_DEFAULT_NAME) { - free(event->eventName); - } - free(event); +static void celix_scheduledEvent_destroy(celix_scheduled_event_t* event) { + celix_framework_bundleEntry_decreaseUseCount(event->bndEntry); + celixThreadMutex_destroy(&event->mutex); + celixThreadCondition_destroy(&event->cond); + if (event->eventName != CELIX_SCHEDULED_EVENT_DEFAULT_NAME) { + free(event->eventName); } + free(event); } -void celix_scheduledEvent_waitAndDestroy(celix_scheduled_event_t* event) { +void celix_scheduledEvent_retain(celix_scheduled_event_t* event) { if (event == NULL) { return; } - //wait for use count 0; celixThreadMutex_lock(&event->mutex); - while (event->useCount) { - celixThreadCondition_wait(&event->cond, &event->mutex); + event->useCount += 1; + celixThreadMutex_unlock(&event->mutex); +} + +void celix_scheduledEvent_release(celix_scheduled_event_t* event) { + if (event == NULL) { + return; } + + celixThreadMutex_lock(&event->mutex); + event->useCount -= 1; + bool unused = event->useCount == 0; celixThreadMutex_unlock(&event->mutex); - celix_scheduledEvent_destroy(event); + if (unused) { + celix_scheduledEvent_destroy(event); + } +} + +const char* celix_scheduledEvent_getName(const celix_scheduled_event_t* event) { return event->eventName; } + +long celix_scheduledEvent_getId(const celix_scheduled_event_t* event) { return event->scheduledEventId; } + +double celix_scheduledEvent_getInitialDelayInSeconds(const celix_scheduled_event_t* event) { + return event->initialDelayInSeconds; +} + +double celix_scheduledEvent_getIntervalInSeconds(const celix_scheduled_event_t* event) { + return event->intervalInSeconds; +} + +celix_framework_bundle_entry_t* celix_scheduledEvent_getBundleEntry(const celix_scheduled_event_t* event) { + return event->bndEntry; } bool celix_scheduledEvent_deadlineReached(celix_scheduled_event_t* event, @@ -96,6 +173,10 @@ bool celix_scheduledEvent_deadlineReached(celix_scheduled_event_t* event, double deadline = event->callCount == 0 ? event->initialDelayInSeconds : event->intervalInSeconds; deadline -= CELIX_SCHEDULED_EVENT_INTERVAL_ALLOW_ERROR_IN_SECONDS; bool deadlineReached = elapsed >= deadline; + if (event->processForWakeup) { + deadlineReached = true; + } + if (deadlineReached) { *nextProcessTimeInSeconds = event->intervalInSeconds == 0 /*one shot*/ ? CELIX_FRAMEWORK_DEFAULT_MAX_TIMEDWAIT_EVENT_HANDLER_IN_SECONDS @@ -108,6 +189,12 @@ bool celix_scheduledEvent_deadlineReached(celix_scheduled_event_t* event, } void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct timespec* currentTime) { + fw_log(event->logger, + CELIX_LOG_LEVEL_TRACE, + "Processing scheduled event %s for bundle id %li", + event->eventName, + event->bndEntry->bndId); + void (*callback)(void*) = NULL; void* callbackData = NULL; @@ -115,21 +202,20 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t callback = event->eventCallback; callbackData = event->eventCallbackData; event->useCount += 1; - celixThreadCondition_broadcast(&event->cond); //broadcast for changed useCount celixThreadMutex_unlock(&event->mutex); assert(callback != NULL); - callback(callbackData); //note called outside of lock + callback(callbackData); // note called outside of lock celixThreadMutex_lock(&event->mutex); event->lastScheduledEventTime = *currentTime; event->useCount -= 1; event->callCount += 1; - celixThreadCondition_broadcast(&event->cond); //broadcast for changed useCount + event->processForWakeup = false; + celixThreadCondition_broadcast(&event->cond); // broadcast for changed callCount celixThreadMutex_unlock(&event->mutex); } - bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event) { bool isDone = false; celixThreadMutex_lock(&event->mutex); @@ -137,3 +223,41 @@ bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event) { celixThreadMutex_unlock(&event->mutex); return isDone; } + +size_t celix_scheduledEvent_configureWakeup(celix_scheduled_event_t* event) { + celixThreadMutex_lock(&event->mutex); + event->processForWakeup = true; + size_t currentCallCount = event->callCount; + celixThreadMutex_unlock(&event->mutex); + + fw_log(event->logger, + CELIX_LOG_LEVEL_DEBUG, + "Wakeup scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", + event->eventName, + event->scheduledEventId, + celix_bundle_getSymbolicName(event->bndEntry->bnd), + event->bndEntry->bndId); + + return currentCallCount + 1; +} + +celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_event_t* event, + size_t targetCallCount, + double waitTimeInSeconds) { + celix_status_t status = CELIX_SUCCESS; + if (waitTimeInSeconds > 0) { + struct timespec start = celix_gettime(CLOCK_MONOTONIC); + struct timespec absTimeoutTime = celix_addDelayInSecondsToTime(&start, waitTimeInSeconds); + celixThreadMutex_lock(&event->mutex); + while (event->callCount < targetCallCount) { + celixThreadCondition_waitUntil(&event->cond, &event->mutex, &absTimeoutTime); + struct timespec now = celix_gettime(CLOCK_MONOTONIC); + if (celix_difftime(&start, &now) > waitTimeInSeconds) { + status = CELIX_TIMEOUT; + break; + } + } + celixThreadMutex_unlock(&event->mutex); + } + return status; +} diff --git a/libs/framework/src/celix_scheduled_event.h b/libs/framework/src/celix_scheduled_event.h index d1976152..87e9d14a 100644 --- a/libs/framework/src/celix_scheduled_event.h +++ b/libs/framework/src/celix_scheduled_event.h @@ -27,31 +27,13 @@ extern "C" { #endif -//Allow, 1 microsecond error in interval to ensure pthread cond wakeups result in a call. -#define CELIX_SCHEDULED_EVENT_INTERVAL_ALLOW_ERROR_IN_SECONDS 0.000001 - -static const char* const CELIX_SCHEDULED_EVENT_DEFAULT_NAME = "unnamed"; - -typedef struct celix_scheduled_event { - long scheduledEventId; - celix_framework_logger_t* logger; - celix_framework_bundle_entry_t* bndEntry; - - char* eventName; - double initialDelayInSeconds; - double intervalInSeconds; - void* eventCallbackData; - void (*eventCallback)(void* eventData); - - celix_thread_mutex_t mutex; //protects below - celix_thread_cond_t cond; - size_t useCount; //use count, how many times the event is used to process a eventCallback or doneCallback. - size_t callCount; //nr of times the eventCallback is called - struct timespec lastScheduledEventTime; -} celix_scheduled_event_t; +typedef struct celix_scheduled_event celix_scheduled_event_t; /** * @brief Create a scheduled event for the given bundle. + * + * The scheduled event will be created with a use count of 1. + * * @param[in] bndEntry The bundle entry for which the scheduled event is created. * @param[in] scheduledEventId The id of the scheduled event. * @param[in] eventName The name of the event. If NULL, CELIX_SCHEDULED_EVENT_DEFAULT_NAME is used. @@ -71,14 +53,41 @@ celix_scheduled_event_t* celix_scheduledEvent_create(celix_framework_logger_t* l void (*eventCallback)(void* eventData)); /** - * @brief Destroy the event. + * @brief Retain the scheduled event by increasing the use count. + * Will silently ignore a NULL event. + */ +void celix_scheduledEvent_retain(celix_scheduled_event_t* event); + +/** + * @brief Release the scheduled event by decreasing the use count. If the use count is 0, + * the scheduled event is destroyed. Will silently ignore a NULL event. + */ +void celix_scheduledEvent_release(celix_scheduled_event_t* event); + +/** + * @brief Returns the scheduled event id. */ -void celix_scheduledEvent_destroy(celix_scheduled_event_t* event); +const char* celix_scheduledEvent_getName(const celix_scheduled_event_t* event); /** - * @brief Wait until the useCount is 0 and destroy the event. + * @brief Returns the scheduled event ID. */ -void celix_scheduledEvent_waitAndDestroy(celix_scheduled_event_t* event); +long celix_scheduledEvent_getId(const celix_scheduled_event_t* event); + +/** + * @brief Returns the initial delay of the scheduled event in seconds. + */ +double celix_scheduledEvent_getInitialDelayInSeconds(const celix_scheduled_event_t* event); + +/** + * @brief Returns the interval of the scheduled event in seconds. + */ +double celix_scheduledEvent_getIntervalInSeconds(const celix_scheduled_event_t* event); + +/** + * @brief Returns the framework bundle entry for this scheduled event. + */ +celix_framework_bundle_entry_t* celix_scheduledEvent_getBundleEntry(const celix_scheduled_event_t* event); /** * @brief Returns whether the event deadline is reached and the event should be processed. @@ -110,19 +119,29 @@ void celix_scheduledEvent_process(celix_scheduled_event_t* event, const struct t */ bool celix_scheduleEvent_isDone(celix_scheduled_event_t* event); +/** + * @brief Configure a scheduled event for a wakeup, so celix_scheduledEvent_deadlineReached will return true until + * the event is processed. + * + * @param[in] event The event to configure for wakeup. + * @return The future call count of the event after the next processing is done. + */ +size_t celix_scheduledEvent_configureWakeup(celix_scheduled_event_t* event); + /** * @brief Wait for a scheduled event to reach at least the provided call count. + * Will directly (non blocking) return if the call count is already reached or waitTimeInSeconds is <= 0 * @param[in] event The event to wait for. * @param[in] callCount The call count to wait for. * @param[in] timeout The max time to wait in seconds. * @return CELIX_SUCCESS if the scheduled event reached the call count, CELIX_TIMEOUT if the scheduled event - * did not reach the call count within the timeout. */ -celix_status_t -celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_event_t* event, size_t useCount, double timeout); +celix_status_t celix_scheduledEvent_waitForAtLeastCallCount(celix_scheduled_event_t* event, + size_t targetCallCount, + double waitTimeInSeconds); #ifdef __cplusplus }; #endif -#endif //CELIX_CELIX_SCHEDULED_EVENT_H +#endif // CELIX_CELIX_SCHEDULED_EVENT_H diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c index b617438c..f0d9b860 100644 --- a/libs/framework/src/framework.c +++ b/libs/framework/src/framework.c @@ -248,7 +248,7 @@ celix_status_t framework_create(framework_pt *out, celix_properties_t* config) { framework->dispatcher.eventQueueCap = (int)celix_framework_getConfigPropertyAsLong(framework, CELIX_FRAMEWORK_STATIC_EVENT_QUEUE_SIZE, CELIX_FRAMEWORK_DEFAULT_STATIC_EVENT_QUEUE_SIZE, NULL); framework->dispatcher.eventQueue = malloc(sizeof(celix_framework_event_t) * framework->dispatcher.eventQueueCap); framework->dispatcher.dynamicEventQueue = celix_arrayList_create(); - framework->dispatcher.scheduledEventQueue = celix_arrayList_create(); + framework->dispatcher.scheduledEvents = celix_longHashMap_create(); //create and store framework uuid char uuid[37]; @@ -381,8 +381,8 @@ celix_status_t framework_destroy(framework_pt framework) { assert(celix_arrayList_size(framework->dispatcher.dynamicEventQueue) == 0); celix_arrayList_destroy(framework->dispatcher.dynamicEventQueue); - assert(celix_arrayList_size(framework->dispatcher.scheduledEventQueue) == 0); - celix_arrayList_destroy(framework->dispatcher.scheduledEventQueue); + assert(celix_longHashMap_size(framework->dispatcher.scheduledEvents) == 0); + celix_longHashMap_destroy(framework->dispatcher.scheduledEvents); celix_bundleCache_destroy(framework->cache); @@ -1439,14 +1439,14 @@ static double celix_framework_processScheduledEvents(celix_framework_t* fw) { callEvent = NULL; double nextEvent; celixThreadMutex_lock(&fw->dispatcher.mutex); - for (int i = 0; i< celix_arrayList_size(fw->dispatcher.scheduledEventQueue); ++i) { - celix_scheduled_event_t* event = celix_arrayList_get(fw->dispatcher.scheduledEventQueue, i); - bool call = celix_scheduledEvent_deadlineReached(event, &ts, &nextEvent); + CELIX_LONG_HASH_MAP_ITERATE(fw->dispatcher.scheduledEvents, entry) { + celix_scheduled_event_t* visit = entry.value.ptrValue; + bool call = celix_scheduledEvent_deadlineReached(visit, &ts, &nextEvent); if (nextScheduledEvent == 0.0 || nextEvent < nextScheduledEvent) { nextScheduledEvent = nextEvent; } if (call) { - callEvent = event; + callEvent = visit; break; } } @@ -1459,13 +1459,13 @@ static double celix_framework_processScheduledEvents(celix_framework_t* fw) { fw_log(fw->logger, CELIX_LOG_LEVEL_DEBUG, "Removing processed one-shot scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", - callEvent->eventName, - callEvent->scheduledEventId, - celix_bundle_getSymbolicName(callEvent->bndEntry->bnd), - callEvent->bndEntry->bndId); - celix_arrayList_remove(fw->dispatcher.scheduledEventQueue, callEvent); + celix_scheduledEvent_getName(callEvent), + celix_scheduledEvent_getId(callEvent), + celix_bundle_getSymbolicName(celix_scheduledEvent_getBundleEntry(callEvent)->bnd), + celix_scheduledEvent_getBundleEntry(callEvent)->bndId); + celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(callEvent)); + celix_scheduledEvent_release(callEvent); celixThreadMutex_unlock(&fw->dispatcher.mutex); - celix_scheduledEvent_destroy(callEvent); } } } while (callEvent != NULL); @@ -1478,35 +1478,40 @@ void celix_framework_cleanupScheduledEvents(celix_framework_t* fw, long bndId) { do { removedEvent = NULL; celixThreadMutex_lock(&fw->dispatcher.mutex); - for (int i = 0; i< celix_arrayList_size(fw->dispatcher.scheduledEventQueue); ++i) { - celix_scheduled_event_t* visit = celix_arrayList_get(fw->dispatcher.scheduledEventQueue, i); - if (visit->bndEntry->bndId == bndId) { + CELIX_LONG_HASH_MAP_ITERATE(fw->dispatcher.scheduledEvents, entry) { + celix_scheduled_event_t* visit = entry.value.ptrValue; + celix_framework_bundle_entry_t* bndEntry = celix_scheduledEvent_getBundleEntry(visit); + if (bndEntry->bndId == bndId) { + celix_longHashMap_remove(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(visit)); removedEvent = visit; - celix_arrayList_removeAt(fw->dispatcher.scheduledEventQueue, i); break; } } celixThreadMutex_unlock(&fw->dispatcher.mutex); if (removedEvent) { - if (removedEvent->intervalInSeconds > 0) { + celix_framework_bundle_entry_t* bndEntry = celix_scheduledEvent_getBundleEntry(removedEvent); + const char* eventName = celix_scheduledEvent_getName(removedEvent); + double interval = celix_scheduledEvent_getIntervalInSeconds(removedEvent); + long eventId = celix_scheduledEvent_getId(removedEvent); + if (interval > 0) { fw_log(fw->logger, CELIX_LOG_LEVEL_WARNING, "Removing dangling scheduled event '%s' (id=%li) for bundle '%s' (id=%li). This should have been cleaned up by the bundle.", - removedEvent->eventName, - removedEvent->scheduledEventId, - celix_bundle_getSymbolicName(removedEvent->bndEntry->bnd), - removedEvent->bndEntry->bndId); + eventName, + eventId, + celix_bundle_getSymbolicName(bndEntry->bnd), + bndEntry->bndId); } else { fw_log(fw->logger, CELIX_LOG_LEVEL_DEBUG, "Removing unprocessed one-shot scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", - removedEvent->eventName, - removedEvent->scheduledEventId, - celix_bundle_getSymbolicName(removedEvent->bndEntry->bnd), - removedEvent->bndEntry->bndId); - } - celix_scheduledEvent_waitAndDestroy(removedEvent); + eventName, + eventId, + celix_bundle_getSymbolicName(bndEntry->bnd), + bndEntry->bndId); + } + celix_scheduledEvent_release(removedEvent); } } while (removedEvent != NULL); } @@ -2554,29 +2559,61 @@ long celix_framework_addScheduledEvent(celix_framework_t* fw, eventCallback); if (event == NULL) { - return -1; //error logged by celix_scheduledEvent_create + return -1L; //error logged by celix_scheduledEvent_create } + fw_log(fw->logger, + CELIX_LOG_LEVEL_DEBUG, + "Added scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", + celix_scheduledEvent_getName(event), + celix_scheduledEvent_getId(event), + celix_bundle_getSymbolicName(bndEntry->bnd), + bndId); + celixThreadMutex_lock(&fw->dispatcher.mutex); - celix_status_t addStatus = celix_arrayList_add(fw->dispatcher.scheduledEventQueue, event); + celix_longHashMap_put(fw->dispatcher.scheduledEvents, celix_scheduledEvent_getId(event), event); celixThreadCondition_broadcast(&fw->dispatcher.cond); //notify dispatcher thread for newly added scheduled event celixThreadMutex_unlock(&fw->dispatcher.mutex); - if (addStatus != CELIX_SUCCESS) { - fw_log(fw->logger, CELIX_LOG_LEVEL_ERROR, "Cannot add scheduled event for bundle id %li. Out of memory", bndId); - celix_scheduledEvent_destroy(event); - return -1; + return celix_scheduledEvent_getId(event); +} + +celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, + long scheduledEventId, + double waitTimeInSeconds) { + celixThreadMutex_lock(&fw->dispatcher.mutex); + celix_scheduled_event_t* event = celix_longHashMap_get(fw->dispatcher.scheduledEvents, scheduledEventId); + if (event != NULL) { + celix_scheduledEvent_retain(event); } + celixThreadMutex_unlock(&fw->dispatcher.mutex); - fw_log(fw->logger, - CELIX_LOG_LEVEL_DEBUG, - "Added scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", - event->eventName, - event->scheduledEventId, - celix_bundle_getSymbolicName(bndEntry->bnd), - bndId); + if (event == NULL) { + fw_log(fw->logger, + CELIX_LOG_LEVEL_WARNING, + "celix_framework_wakeupScheduledEvent called with unknown scheduled event id %li", + scheduledEventId); + return CELIX_ILLEGAL_ARGUMENT; + } + + + size_t callCountAfterWakup = celix_scheduledEvent_configureWakeup(event); + celixThreadMutex_lock(&fw->dispatcher.mutex); + celixThreadCondition_broadcast(&fw->dispatcher.cond); //notify dispatcher thread for configured wakeup + celixThreadMutex_unlock(&fw->dispatcher.mutex); - return event->scheduledEventId; + celix_status_t status = CELIX_SUCCESS; + if (waitTimeInSeconds > 0) { + if (celix_framework_isCurrentThreadTheEventLoop(fw)) { + fw_log(fw->logger, CELIX_LOG_LEVEL_WARNING, "celix_framework_wakeupScheduledEvent called from the " + "event loop thread. This can result in a deadlock!"); + } + status = celix_scheduledEvent_waitForAtLeastCallCount(event, callCountAfterWakup, waitTimeInSeconds); + } + celix_scheduledEvent_release(event); + + + return status; } bool celix_framework_removeScheduledEvent(celix_framework_t* fw, long scheduledEventId) { @@ -2584,16 +2621,9 @@ bool celix_framework_removeScheduledEvent(celix_framework_t* fw, long scheduledE return false; //silently ignore } - celix_scheduled_event_t* event = NULL; celixThreadMutex_lock(&fw->dispatcher.mutex); - for (int i = 0; i < celix_arrayList_size(fw->dispatcher.scheduledEventQueue); ++i) { - celix_scheduled_event_t* visit = celix_arrayList_get(fw->dispatcher.scheduledEventQueue, i); - if (visit->scheduledEventId == scheduledEventId) { - event = visit; - celix_arrayList_removeAt(fw->dispatcher.scheduledEventQueue, i); - break; - } - } + celix_scheduled_event_t* event = celix_longHashMap_get(fw->dispatcher.scheduledEvents, scheduledEventId); + celix_longHashMap_remove(fw->dispatcher.scheduledEvents, scheduledEventId); celixThreadMutex_unlock(&fw->dispatcher.mutex); if (event == NULL) { @@ -2602,14 +2632,15 @@ bool celix_framework_removeScheduledEvent(celix_framework_t* fw, long scheduledE return false; } + celix_framework_bundle_entry_t* bndEntry = celix_scheduledEvent_getBundleEntry(event); fw_log(fw->logger, CELIX_LOG_LEVEL_DEBUG, "Removing scheduled event '%s' (id=%li) for bundle '%s' (id=%li)", - event->eventName, - event->scheduledEventId, - celix_bundle_getSymbolicName(event->bndEntry->bnd), - event->bndEntry->bndId); - celix_scheduledEvent_waitAndDestroy(event); + celix_scheduledEvent_getName(event), + celix_scheduledEvent_getId(event), + celix_bundle_getSymbolicName(bndEntry->bnd), + bndEntry->bndId); + celix_scheduledEvent_release(event); return true; } diff --git a/libs/framework/src/framework_private.h b/libs/framework/src/framework_private.h index 672a0a8e..0a031719 100644 --- a/libs/framework/src/framework_private.h +++ b/libs/framework/src/framework_private.h @@ -184,7 +184,7 @@ struct celix_framework { int nbUnregister; // number of pending async de-registration int nbEvent; // number of pending generic events } stats; - celix_array_list_t *scheduledEventQueue; //entry = celix_framework_scheduled_event_t*. Used for scheduled events + celix_long_hash_map_t *scheduledEvents; //key = scheduled event id, entry = celix_framework_scheduled_event_t*. Used for scheduled events } dispatcher; celix_framework_logger_t* logger; @@ -471,6 +471,22 @@ long celix_framework_addScheduledEvent(celix_framework_t* fw, double intervalInSeconds, void* eventData, void (*eventCallback)(void* eventData)); + +/** + * @brief Wakeup a scheduled event. + * + * If waitTimeInSeconds is not 0, this function will block until the scheduled event callback is called. + * If waitTimeInSeconds is 0, this function will return immediately. + * + * @param[in] ctx The bundle context. + * @param[in] scheduledEventId The scheduled event id to wakeup. + * @param[in] waitTimeInSeconds If not 0, this function will block until the scheduled event callback + * is called or the provided timeout is reached. + * @return CELIX_SUCCESS if the scheduled event is woken up, CELIX_ILLEGAL_ARGUMENT if the scheduled event id is not + */ +celix_status_t celix_framework_wakeupScheduledEvent(celix_framework_t* fw, + long scheduledEventId, + double waitTimeInSeconds); /** * @brief Cancel a scheduled event. diff --git a/libs/utils/gtest/src/ThreadsTestSuite.cc b/libs/utils/gtest/src/ThreadsTestSuite.cc index 999415a5..d23e5260 100644 --- a/libs/utils/gtest/src/ThreadsTestSuite.cc +++ b/libs/utils/gtest/src/ThreadsTestSuite.cc @@ -21,6 +21,7 @@ #include <csignal> +#include "celix_utils.h" #include "celix_threads.h" class ThreadsTestSuite : public ::testing::Test { @@ -278,6 +279,64 @@ TEST_F(ThreadsTestSuite, CondBroadcastTest) { free(param); } +TEST_F(ThreadsTestSuite, CondTimedWaitTest) { + celix_thread_mutex_t mutex; + celix_thread_cond_t cond; + + auto status = celixThreadMutex_create(&mutex, NULL); + ASSERT_EQ(status, CELIX_SUCCESS); + status = celixThreadCondition_init(&cond, NULL); + ASSERT_EQ(status, CELIX_SUCCESS); + + //Test with NULL abstime + status = celixThreadCondition_waitUntil(&cond, &mutex, NULL); + ASSERT_EQ(status, CELIX_ILLEGAL_ARGUMENT); + + //: Test with negative tv_sec + struct timespec abstime = {-1, 0}; + status = celixThreadCondition_waitUntil(&cond, &mutex, &abstime); + ASSERT_EQ(status, CELIX_ILLEGAL_ARGUMENT); + + //Test with negative tv_nsec + abstime = {0, -1}; + status = celixThreadCondition_waitUntil(&cond, &mutex, &abstime); + ASSERT_EQ(status, CELIX_ILLEGAL_ARGUMENT); + + //Test with valid abstime + auto start = celix_gettime(CLOCK_MONOTONIC); + auto targetEnd = celix_addDelayInSecondsToTime(&start, 0.001); + pthread_mutex_lock(&mutex); + status = celixThreadCondition_waitUntil(&cond, &mutex, &targetEnd); + ASSERT_EQ(status, ETIMEDOUT); + pthread_mutex_unlock(&mutex); + auto end = celix_gettime(CLOCK_MONOTONIC); + EXPECT_NEAR(celix_difftime(&end, &start), 0.001, 0.01); +} + +TEST_F(ThreadsTestSuite, CondWaitForTest) { + celix_thread_mutex_t mutex; + celix_thread_cond_t cond; + + auto status = celixThreadMutex_create(&mutex, NULL); + ASSERT_EQ(status, CELIX_SUCCESS); + status = celixThreadCondition_init(&cond, NULL); + ASSERT_EQ(status, CELIX_SUCCESS); + + //Test with negative timeout + status = celixThreadCondition_waitFor(&cond, &mutex, -1); + ASSERT_EQ(status, CELIX_ILLEGAL_ARGUMENT); + + //Test with valid delay + auto start = celix_gettime(CLOCK_MONOTONIC); + pthread_mutex_lock(&mutex); + status = celixThreadCondition_waitFor(&cond, &mutex, 0.001); + ASSERT_EQ(status, ETIMEDOUT); + pthread_mutex_unlock(&mutex); + auto end = celix_gettime(CLOCK_MONOTONIC); + EXPECT_NEAR(celix_difftime(&end, &start), 0.001, 0.01); +} + + //----------------------CELIX READ-WRITE LOCK TESTS---------------------- TEST_F(ThreadsTestSuite, CreateRwLockTest) { diff --git a/libs/utils/gtest/src/TimeUtilsTestSuite.cc b/libs/utils/gtest/src/TimeUtilsTestSuite.cc index ccbca692..303191eb 100644 --- a/libs/utils/gtest/src/TimeUtilsTestSuite.cc +++ b/libs/utils/gtest/src/TimeUtilsTestSuite.cc @@ -54,4 +54,52 @@ TEST_F(TimeUtilsTestSuite, ElapsedTimeTest) { auto diff = celix_elapsedtime(CLOCK_MONOTONIC, t1); EXPECT_GE(diff, 0.00001 /*10 us*/); EXPECT_LT(diff, 0.1 /*1/10 s, note do want to rely on accuracy of sleep_for*/); -} \ No newline at end of file +} + +TEST_F(TimeUtilsTestSuite, AddDelayInSecondsToTimeTest) { + //Test with NULL time and delayInSeconds = 0 + struct timespec delayedTime = celix_addDelayInSecondsToTime(NULL, 0); + ASSERT_EQ(delayedTime.tv_sec, 0); + ASSERT_EQ(delayedTime.tv_nsec, 0); + + //Test with NULL time and delayInSeconds = 1.5 + delayedTime = celix_addDelayInSecondsToTime(NULL, 1.5); + struct timespec expectedTime = {1, 500000000}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); + + //Test with time.tv_nsec + nanoseconds > CELIX_NS_IN_SEC + struct timespec time = {0, 500000000}; + delayedTime = celix_addDelayInSecondsToTime(&time, 0.6); + expectedTime = {1, 100000000}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); + + //Test with time.tv_nsec + nanoseconds < 0 + time = {1, 500000000}; + delayedTime = celix_addDelayInSecondsToTime(&time, -0.6); + expectedTime = {0, 900000000}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); + + //Test with time.tv_nsec + nanoseconds = CELIX_NS_IN_SEC + time = {1, 500000000}; + delayedTime = celix_addDelayInSecondsToTime(&time, 0.5); + expectedTime = {2, 0}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); + + //Test with time.tv_nsec + nanoseconds < CELIX_NS_IN_SEC + time = {1, 500000000}; + delayedTime = celix_addDelayInSecondsToTime(&time, 0.4); + expectedTime = {1, 900000000}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); + + //Test if time becomes negative + time = {0, 500000000}; + delayedTime = celix_addDelayInSecondsToTime(&time, -1.5); + expectedTime = {-1, 0}; + ASSERT_EQ(delayedTime.tv_sec, expectedTime.tv_sec); + ASSERT_EQ(delayedTime.tv_nsec, expectedTime.tv_nsec); +} diff --git a/libs/utils/include/celix_threads.h b/libs/utils/include/celix_threads.h index fba1e246..8d8f873a 100644 --- a/libs/utils/include/celix_threads.h +++ b/libs/utils/include/celix_threads.h @@ -130,6 +130,43 @@ CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_wait(celix_thread_cond_t CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds); +/** + * @brief Wait for the condition to be signaled or until the given delayInSeconds is reached. + * + * @section Errors + * - CELIX_SUCCESS if the condition is signaled before the delayInSeconds is reached. + * - CELIX_ILLEGAL_ARGUMENT if the delayInSeconds is negative. + * - ENOTRECOVERABLE if the state protected by the mutex is not recoverable. + * - ETIMEDOUT If the delayInSeconds has passed. + * + * + * @param[in] cond The condition to wait for. + * @param[in] mutex The (locked) mutex to use. + * @param[in] delayInSeconds The delay in seconds to wait for the condition to be signaled. + * @return CELIX_SUCCESS if the condition is signaled before the delayInSeconds is reached. + */ +CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_waitFor(celix_thread_cond_t* cond, + celix_thread_mutex_t* mutex, + double delayInSeconds); + +/** + * @brief Wait for the condition to be signaled or until the given absolute time is reached. + * + * @section Errors + * - CELIX_SUCCESS if the condition is signaled before the delayInSeconds is reached. + * - CELIX_ILLEGAL_ARGUMENT if the abstime is negative. + * - ENOTRECOVERABLE if the state protected by the mutex is not recoverable. + * - ETIMEDOUT If the abstime has passed. + * + * @param[in] cond The condition to wait for. + * @param[in] mutex The (locked) mutex to use. + * @param[in] abstime The absolute time to wait for the condition to be signaled. + * @return CELIX_SUCCESS if the condition is signaled before the delayInSeconds is reached. + */ +CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_waitUntil(celix_thread_cond_t* cond, + celix_thread_mutex_t* mutex, + const struct timespec* abstime); + CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond); CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_signal(celix_thread_cond_t *cond); diff --git a/libs/utils/include/celix_utils.h b/libs/utils/include/celix_utils.h index ff1d3d32..f33413fd 100644 --- a/libs/utils/include/celix_utils.h +++ b/libs/utils/include/celix_utils.h @@ -158,6 +158,14 @@ CELIX_UTILS_EXPORT double celix_difftime(const struct timespec *tBegin, const st */ CELIX_UTILS_EXPORT struct timespec celix_gettime(clockid_t clockId); +/** + * @brief Returns the absolute time for the provided delay in seconds. + * @param[in] time The time to add the delay to. Can be NULL, in which case the time is 0. + * @param[in] delayInSeconds The delay in seconds. + * @return A new time with the delay added. +*/ +CELIX_UTILS_EXPORT struct timespec celix_addDelayInSecondsToTime(const struct timespec* time, double delayInSeconds); + /** * @brief Returns the elapsed time - in seconds - relative to the startTime * using the clock for the provided clockid. diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c index bd56f3d2..eb01fa2c 100644 --- a/libs/utils/src/celix_threads.c +++ b/libs/utils/src/celix_threads.c @@ -197,6 +197,26 @@ celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, } #endif +CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_waitFor(celix_thread_cond_t* cond, + celix_thread_mutex_t* mutex, + double delayInSeconds) { + if (delayInSeconds < 0) { + return CELIX_ILLEGAL_ARGUMENT; + } + struct timespec now = celix_gettime(CLOCK_MONOTONIC); + struct timespec abstime = celix_addDelayInSecondsToTime(&now, delayInSeconds); + return celixThreadCondition_waitUntil(cond, mutex, &abstime); +} + +CELIX_UTILS_EXPORT celix_status_t celixThreadCondition_waitUntil(celix_thread_cond_t* cond, + celix_thread_mutex_t* mutex, + const struct timespec* abstime) { + if (abstime == NULL || abstime->tv_sec < 0 || abstime->tv_nsec < 0) { + return CELIX_ILLEGAL_ARGUMENT; + } + return pthread_cond_timedwait(cond, mutex, abstime); +} + celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond) { return pthread_cond_broadcast(cond); } diff --git a/libs/utils/src/utils.c b/libs/utils/src/utils.c index 95ab561f..1d1269af 100644 --- a/libs/utils/src/utils.c +++ b/libs/utils/src/utils.c @@ -265,6 +265,30 @@ struct timespec celix_gettime(clockid_t clockId) { return t; } +struct timespec celix_addDelayInSecondsToTime(const struct timespec* time, double delayInSeconds) { + struct timespec delayedTime; + memset(&delayedTime, 0, sizeof(delayedTime)); + memset(&delayedTime, 0, sizeof(delayedTime)); + if (time != NULL) { + delayedTime = *time; + } + + long seconds = (long)delayInSeconds; + long nanoseconds = (delayInSeconds - seconds) * CELIX_NS_IN_SEC; + delayedTime.tv_sec += seconds; + delayedTime.tv_nsec += nanoseconds; + + if (delayedTime.tv_nsec >= CELIX_NS_IN_SEC) { + delayedTime.tv_sec += 1; + delayedTime.tv_nsec -= CELIX_NS_IN_SEC; + } else if (delayedTime.tv_nsec < 0) { + delayedTime.tv_sec -= 1; + delayedTime.tv_nsec += CELIX_NS_IN_SEC; + } + + return delayedTime; +} + double celix_elapsedtime(clockid_t clockId, struct timespec startTime) { struct timespec now = celix_gettime(clockId); return celix_difftime(&startTime, &now);
