pnoltes commented on code in PR #773: URL: https://github.com/apache/celix/pull/773#discussion_r1848890413
########## bundles/event_admin/event_admin/diagrams/remote_event_delivery_seq.puml: ########## @@ -0,0 +1,39 @@ +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@startuml +'https://plantuml.com/sequence-diagram + +box FrameworkA +participant "Event Admin" as EventAdmin1 +participant "Remote Provider" as RemoteProvider1 +end box + +box FrameworkB +participant "Remote Provider" as RemoteProvider2 +participant "Event Admin" as EventAdmin2 +end box + +-\EventAdmin1:postEvent/sendEvent +EventAdmin1->EventAdmin1:Delivery event to local event handlers +alt "celix.event.remote.enable" is true + EventAdmin1->EventAdmin1:Unset the "celix.event.remote.enable" property + EventAdmin1->RemoteProvider1:postEvent/sendEvent + RemoteProvider1->RemoteProvider2:IPC Review Comment: This could be done through IPC, but also using any network-based transport. So maybe this is better named as "IPC or Network" ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -61,15 +64,24 @@ typedef struct celix_event_entry { celix_long_hash_map_t* eventHandlers;//key: event handler service id, value: null }celix_event_entry_t; +typedef struct celix_event_seq_id_cache { + struct timespec lastModified; + long seqIdBuffer[CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE]; +}celix_event_seq_id_cache_t; Review Comment: nitpick: missing space between `}` and `celix_event_seq_id_cache_t` ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { Review Comment: IMO this should not be possible, so maybe a `assert(serviceId >= 0);` is enough; this saves some test code. ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { + if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) { + celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache); + while (!celix_stringHashMapIterator_isEnd(&iter)) { + celix_event_seq_id_cache_t* cache = iter.value.ptrValue; + if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) { + celix_stringHashMapIterator_remove(&iter); + } else { + celix_stringHashMapIterator_next(&iter); + } + } + } + return; +} + +static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const char* topic CELIX_UNUSED, const celix_properties_t* properties) { + const char* remoteFwUUID = celix_properties_get(properties, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL); + if (remoteFwUUID == NULL) { + return false; + } + long seqId = celix_properties_getAsLong(properties, CELIX_EVENT_REMOTE_SEQ_ID, -1L); + if (seqId <= 0) { + return false; + } + long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE; + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_event_seq_id_cache_t* seqIdCache = celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID); + if (seqIdCache == NULL) { + celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, sizeof(*cache)); + if (cache == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to create event seq id cache for %s.", remoteFwUUID); + return false; + } + celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, remoteFwUUID, cache); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to add event seq id cache for %s.", remoteFwUUID); + return false; + } + seqIdCache = celix_steal_ptr(cache); + } + seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC); + if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) { + return true; + } + seqIdCache->seqIdBuffer[seqIdMod] = seqId; + + celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea); + + return false; +} + +static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) { + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + long seqId = ea->nextSeqId++; + if (seqId <= 0) { Review Comment: nextSeqId is initialised with 1, so this should not be possible (dead code). ########## bundles/event_admin/examples/res/mosquitto.conf: ########## @@ -0,0 +1,904 @@ +# Config file for mosquitto Review Comment: Missing apache license comment text ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { Review Comment: The name of this function does not really describe what is done. I think something like `celix_eventAdmin_cleanupOldEventSeqIdCache ` is more descriptive. ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { + if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) { + celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache); + while (!celix_stringHashMapIterator_isEnd(&iter)) { + celix_event_seq_id_cache_t* cache = iter.value.ptrValue; + if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) { Review Comment: I think it would be better to make the 1h threshold configureable using a config property. ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -61,15 +64,24 @@ typedef struct celix_event_entry { celix_long_hash_map_t* eventHandlers;//key: event handler service id, value: null }celix_event_entry_t; +typedef struct celix_event_seq_id_cache { + struct timespec lastModified; + long seqIdBuffer[CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE]; +}celix_event_seq_id_cache_t; + struct celix_event_admin { celix_bundle_context_t* ctx; celix_log_helper_t* logHelper; unsigned int handlerThreadNr; - celix_thread_rwlock_t lock;//projects: channels,eventHandlers + const char* fwUUID; + celix_thread_rwlock_t lock;//projects: channels,eventHandlers,eventSeqIdCache,remoteProviderServices Review Comment: and nextSeqId ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { + if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) { + celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache); + while (!celix_stringHashMapIterator_isEnd(&iter)) { + celix_event_seq_id_cache_t* cache = iter.value.ptrValue; + if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) { + celix_stringHashMapIterator_remove(&iter); + } else { + celix_stringHashMapIterator_next(&iter); + } + } + } + return; +} + +static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const char* topic CELIX_UNUSED, const celix_properties_t* properties) { + const char* remoteFwUUID = celix_properties_get(properties, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL); + if (remoteFwUUID == NULL) { + return false; + } + long seqId = celix_properties_getAsLong(properties, CELIX_EVENT_REMOTE_SEQ_ID, -1L); + if (seqId <= 0) { + return false; + } + long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE; + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_event_seq_id_cache_t* seqIdCache = celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID); + if (seqIdCache == NULL) { + celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, sizeof(*cache)); + if (cache == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to create event seq id cache for %s.", remoteFwUUID); + return false; + } + celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, remoteFwUUID, cache); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to add event seq id cache for %s.", remoteFwUUID); + return false; + } + seqIdCache = celix_steal_ptr(cache); + } + seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC); + if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) { + return true; + } + seqIdCache->seqIdBuffer[seqIdMod] = seqId; + + celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea); + + return false; +} + +static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) { + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + long seqId = ea->nextSeqId++; + if (seqId <= 0) { + seqId = 1; + ea->nextSeqId = seqId + 1; + } + return seqId; +} + +static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, const char* topic, const celix_properties_t* props, bool async) { + celix_autoptr(celix_properties_t) remoteProps = celix_properties_copy(props); + if (remoteProps == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to copy remote properties for event %s.", topic); + return ENOMEM; + } + celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE); + celix_status_t status = celix_properties_set(remoteProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to set remote framework uuid for event %s.", topic); + return status; + } + long seqId = celix_eventAdmin_getEventSeqId(ea); + status = celix_properties_setLong(remoteProps, CELIX_EVENT_REMOTE_SEQ_ID, seqId); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to set remote seq id for event %s.", topic); + return status; + } + celix_auto(celix_rwlock_rlock_guard_t) rLockGuard = celixRwlockRlockGuard_init(&ea->lock); + CELIX_LONG_HASH_MAP_ITERATE(ea->remoteProviderServices, iter) { + celix_event_remote_provider_service_t* remoteProvider = iter.value.ptrValue; + if (async) { + celix_logHelper_trace(ea->logHelper, "Post event %s to remote provider.", topic); + status = remoteProvider->postEvent(remoteProvider->handle, topic, remoteProps); Review Comment: Note that the `postEvent` and `sendEvent` is called inside a lock. IMO this adds a constraint to the remote provider implementations to not call any services in the `postEvent` or `sendEvent` , because this could lead to - a unintentionally - call to another post/send event which is then forwarded to the remote providers -> deadlock. Or this should be clearly documented on the remote provider service documentation, or the usage of remote providers should be protected with a use count. something like (pseudo): ``` addRemoteProvider: - create provider entry with a use count of 0 - lock and add entry to map removeRemoteProvider: - lock - remote provider entry from map - unlock - wait for use count 0 - free entry celix_eventAdmin_deliverEventToRemote: - create local array list - lock - get entry, increase use count and add entry to local list - unlock - loop through list and call postEvent or sendEvent - lock - loop through list and decrease use count - unlock ``` This ensure that services are called outside a lock, but introduces quite to additional complexity. That being said, I would prefer such a solution. Especially for remote calls I have encountered quite a few cycling deadlocks. ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { + if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) { + celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache); + while (!celix_stringHashMapIterator_isEnd(&iter)) { + celix_event_seq_id_cache_t* cache = iter.value.ptrValue; + if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) { + celix_stringHashMapIterator_remove(&iter); + } else { + celix_stringHashMapIterator_next(&iter); + } + } + } + return; +} + +static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const char* topic CELIX_UNUSED, const celix_properties_t* properties) { + const char* remoteFwUUID = celix_properties_get(properties, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL); + if (remoteFwUUID == NULL) { + return false; + } + long seqId = celix_properties_getAsLong(properties, CELIX_EVENT_REMOTE_SEQ_ID, -1L); + if (seqId <= 0) { + return false; + } + long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE; + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_event_seq_id_cache_t* seqIdCache = celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID); + if (seqIdCache == NULL) { + celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, sizeof(*cache)); + if (cache == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to create event seq id cache for %s.", remoteFwUUID); + return false; + } + celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, remoteFwUUID, cache); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to add event seq id cache for %s.", remoteFwUUID); + return false; + } + seqIdCache = celix_steal_ptr(cache); + } + seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC); + if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) { + return true; + } + seqIdCache->seqIdBuffer[seqIdMod] = seqId; + + celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea); + + return false; +} + +static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) { + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + long seqId = ea->nextSeqId++; + if (seqId <= 0) { + seqId = 1; + ea->nextSeqId = seqId + 1; + } + return seqId; +} + +static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, const char* topic, const celix_properties_t* props, bool async) { + celix_autoptr(celix_properties_t) remoteProps = celix_properties_copy(props); + if (remoteProps == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to copy remote properties for event %s.", topic); + return ENOMEM; + } + celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE); + celix_status_t status = celix_properties_set(remoteProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID); Review Comment: 👍 I like the design of an opt-in with CELIX_EVENT_REMOTE_ENABLE and the "this is a remote event" indication with CELIX_EVENT_REMOTE_FRAMEWORK_UUID. ########## bundles/event_admin/event_admin_api/include/celix_event_constants.h: ########## @@ -106,6 +106,47 @@ extern "C" { */ #define CELIX_EVENT_TIMESTAMP "timestamp" +/** + * @brief It is a property of event to indicate the event is a remote event. The type of the value for this event property is Boolean. + * + * If the value is true, the event will be delivered to remote + * event handlers and local event handlers, otherwise, the event will be only delivered to local event handlers. + */ +#define CELIX_EVENT_REMOTE_ENABLE "celix.event.remote.enable" + +/** + * @brief The QoS of the remote event. The type of the value for this event property is integer. It indicates the quality of service of the remote event. If the value is not set, the remote provider should use a proper default value. + * + * The value must be one of the following: + * - 0: At most once delivery + * - 1: At least once delivery + * - 2: Exactly once delivery + */ +#define CELIX_EVENT_REMOTE_QOS "celix.event.remote.qos" Review Comment: Should this value not impact whether a sendEvent return a success or not? ########## bundles/event_admin/event_admin/src/celix_event_admin.c: ########## @@ -415,6 +454,131 @@ int celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c return CELIX_SUCCESS; } +int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc); + return CELIX_SUCCESS; +} + +int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, const celix_properties_t* props) { + assert(handle != NULL); + assert(svc != NULL); + celix_event_admin_t* ea = handle; + long serviceId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); + if (serviceId < 0) { + celix_logHelper_error(ea->logHelper, "Remote provider service id is invalid."); + return CELIX_ILLEGAL_ARGUMENT; + } + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_longHashMap_remove(ea->remoteProviderServices, serviceId); + return CELIX_SUCCESS; +} + +static void celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) { + if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) { + celix_string_hash_map_iterator_t iter = celix_stringHashMap_begin(ea->eventSeqIdCache); + while (!celix_stringHashMapIterator_isEnd(&iter)) { + celix_event_seq_id_cache_t* cache = iter.value.ptrValue; + if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 60*60/*1h*/) { + celix_stringHashMapIterator_remove(&iter); + } else { + celix_stringHashMapIterator_next(&iter); + } + } + } + return; +} + +static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const char* topic CELIX_UNUSED, const celix_properties_t* properties) { + const char* remoteFwUUID = celix_properties_get(properties, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL); + if (remoteFwUUID == NULL) { + return false; + } + long seqId = celix_properties_getAsLong(properties, CELIX_EVENT_REMOTE_SEQ_ID, -1L); + if (seqId <= 0) { + return false; + } + long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE; + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + celix_event_seq_id_cache_t* seqIdCache = celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID); + if (seqIdCache == NULL) { + celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, sizeof(*cache)); + if (cache == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to create event seq id cache for %s.", remoteFwUUID); + return false; + } + celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, remoteFwUUID, cache); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to add event seq id cache for %s.", remoteFwUUID); + return false; + } + seqIdCache = celix_steal_ptr(cache); + } + seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC); + if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) { + return true; + } + seqIdCache->seqIdBuffer[seqIdMod] = seqId; + + celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea); + + return false; +} + +static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) { + celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&ea->lock); + long seqId = ea->nextSeqId++; + if (seqId <= 0) { + seqId = 1; + ea->nextSeqId = seqId + 1; + } + return seqId; +} + +static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, const char* topic, const celix_properties_t* props, bool async) { + celix_autoptr(celix_properties_t) remoteProps = celix_properties_copy(props); + if (remoteProps == NULL) { + celix_logHelper_error(ea->logHelper, "Failed to copy remote properties for event %s.", topic); + return ENOMEM; + } + celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE); + celix_status_t status = celix_properties_set(remoteProps, CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to set remote framework uuid for event %s.", topic); + return status; + } + long seqId = celix_eventAdmin_getEventSeqId(ea); + status = celix_properties_setLong(remoteProps, CELIX_EVENT_REMOTE_SEQ_ID, seqId); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to set remote seq id for event %s.", topic); + return status; + } + celix_auto(celix_rwlock_rlock_guard_t) rLockGuard = celixRwlockRlockGuard_init(&ea->lock); + CELIX_LONG_HASH_MAP_ITERATE(ea->remoteProviderServices, iter) { + celix_event_remote_provider_service_t* remoteProvider = iter.value.ptrValue; + if (async) { + celix_logHelper_trace(ea->logHelper, "Post event %s to remote provider.", topic); + status = remoteProvider->postEvent(remoteProvider->handle, topic, remoteProps); + } else { + celix_logHelper_trace(ea->logHelper, "Send event %s to remote provider.", topic); + status = remoteProvider->sendEvent(remoteProvider->handle, topic, remoteProps); + } + if (status != CELIX_SUCCESS) { + celix_logHelper_error(ea->logHelper, "Failed to deliver event %s to remote provider(%ld).", topic, iter.key); Review Comment: Although I think are guidelines could be more clearly on what log levels to use: > For log levels use the following guidelines: - trace: Use this level for very detailed that you would only want to have while diagnosing problems. - debug: This level should be used for information that might be helpful in diagnosing problems or understanding what's going on, but is too verbose to be enabled by default. - info: Use this level for general operational messages that aren't tied to any specific problem or error condition. They provide insight into the normal behavior of the system. Examples include startup/shutdown messages, configuration assumptions, etc. - warning: Use this level to report an issue from which the system can recover, but which might indicate a potential problem. - error: This level should be used to report issues that need immediate attention and might prevent the system from functioning correctly. These are problems that are unexpected and affect functionality, but not so severe that the process needs to stop. Examples include runtime errors, inability to connect to a service, etc. - fatal: Use this level to report severe errors that prevent the program from continuing to run. After logging a fatal error, the program will typically terminate. IMO this should be warning, because the system cannot deliver an event remotely. This is potentially a problem, but system can continue working locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@celix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org