pnoltes commented on code in PR #773:
URL: https://github.com/apache/celix/pull/773#discussion_r1848890413


##########
bundles/event_admin/event_admin/diagrams/remote_event_delivery_seq.puml:
##########
@@ -0,0 +1,39 @@
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+@startuml
+'https://plantuml.com/sequence-diagram
+
+box FrameworkA
+participant "Event Admin" as EventAdmin1
+participant "Remote Provider" as RemoteProvider1
+end box
+
+box FrameworkB
+participant "Remote Provider" as RemoteProvider2
+participant "Event Admin" as EventAdmin2
+end box
+
+-\EventAdmin1:postEvent/sendEvent
+EventAdmin1->EventAdmin1:Delivery event to local event handlers
+alt "celix.event.remote.enable" is true
+    EventAdmin1->EventAdmin1:Unset the "celix.event.remote.enable" property
+    EventAdmin1->RemoteProvider1:postEvent/sendEvent
+    RemoteProvider1->RemoteProvider2:IPC

Review Comment:
   This could be done through IPC, but also using any network-based transport. 
So maybe this is better named as "IPC or Network" 



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -61,15 +64,24 @@ typedef struct celix_event_entry {
     celix_long_hash_map_t* eventHandlers;//key: event handler service id, 
value: null
 }celix_event_entry_t;
 
+typedef struct celix_event_seq_id_cache {
+    struct timespec lastModified;
+    long seqIdBuffer[CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE];
+}celix_event_seq_id_cache_t;

Review Comment:
   nitpick: missing space between `}` and `celix_event_seq_id_cache_t`



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {

Review Comment:
   IMO this should not be possible, so maybe a `assert(serviceId >= 0);` is 
enough; this saves some test code. 



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{
+    if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
+        celix_string_hash_map_iterator_t iter = 
celix_stringHashMap_begin(ea->eventSeqIdCache);
+        while (!celix_stringHashMapIterator_isEnd(&iter)) {
+            celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
+            if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 
60*60/*1h*/) {
+                celix_stringHashMapIterator_remove(&iter);
+            } else {
+                celix_stringHashMapIterator_next(&iter);
+            }
+        }
+    }
+    return;
+}
+
+static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const 
char* topic CELIX_UNUSED, const celix_properties_t* properties) {
+    const char* remoteFwUUID = celix_properties_get(properties, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL);
+    if (remoteFwUUID == NULL) {
+        return false;
+    }
+    long seqId = celix_properties_getAsLong(properties, 
CELIX_EVENT_REMOTE_SEQ_ID, -1L);
+    if (seqId <= 0) {
+        return false;
+    }
+    long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE;
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_event_seq_id_cache_t* seqIdCache = 
celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID);
+    if (seqIdCache == NULL) {
+        celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, 
sizeof(*cache));
+        if (cache == NULL) {
+            celix_logHelper_error(ea->logHelper, "Failed to create event seq 
id cache for %s.", remoteFwUUID);
+            return false;
+        }
+        celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, 
remoteFwUUID, cache);
+        if (status != CELIX_SUCCESS) {
+            celix_logHelper_error(ea->logHelper, "Failed to add event seq id 
cache for %s.", remoteFwUUID);
+            return false;
+        }
+        seqIdCache = celix_steal_ptr(cache);
+    }
+    seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC);
+    if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) {
+        return true;
+    }
+    seqIdCache->seqIdBuffer[seqIdMod] = seqId;
+
+    celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea);
+
+    return false;
+}
+
+static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) {
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    long seqId = ea->nextSeqId++;
+    if (seqId <= 0) {

Review Comment:
   nextSeqId is initialised with 1, so this should not be possible (dead code). 



##########
bundles/event_admin/examples/res/mosquitto.conf:
##########
@@ -0,0 +1,904 @@
+# Config file for mosquitto

Review Comment:
   Missing apache license comment text



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{

Review Comment:
   The name of this function does not really describe what is done. I think 
something like `celix_eventAdmin_cleanupOldEventSeqIdCache ` is more 
descriptive. 



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{
+    if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
+        celix_string_hash_map_iterator_t iter = 
celix_stringHashMap_begin(ea->eventSeqIdCache);
+        while (!celix_stringHashMapIterator_isEnd(&iter)) {
+            celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
+            if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 
60*60/*1h*/) {

Review Comment:
   I think it would be better to make the 1h threshold configureable using a 
config property. 



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -61,15 +64,24 @@ typedef struct celix_event_entry {
     celix_long_hash_map_t* eventHandlers;//key: event handler service id, 
value: null
 }celix_event_entry_t;
 
+typedef struct celix_event_seq_id_cache {
+    struct timespec lastModified;
+    long seqIdBuffer[CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE];
+}celix_event_seq_id_cache_t;
+
 struct celix_event_admin {
     celix_bundle_context_t* ctx;
     celix_log_helper_t* logHelper;
     unsigned int handlerThreadNr;
-    celix_thread_rwlock_t lock;//projects: channels,eventHandlers
+    const char* fwUUID;
+    celix_thread_rwlock_t lock;//projects: 
channels,eventHandlers,eventSeqIdCache,remoteProviderServices

Review Comment:
   and nextSeqId



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{
+    if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
+        celix_string_hash_map_iterator_t iter = 
celix_stringHashMap_begin(ea->eventSeqIdCache);
+        while (!celix_stringHashMapIterator_isEnd(&iter)) {
+            celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
+            if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 
60*60/*1h*/) {
+                celix_stringHashMapIterator_remove(&iter);
+            } else {
+                celix_stringHashMapIterator_next(&iter);
+            }
+        }
+    }
+    return;
+}
+
+static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const 
char* topic CELIX_UNUSED, const celix_properties_t* properties) {
+    const char* remoteFwUUID = celix_properties_get(properties, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL);
+    if (remoteFwUUID == NULL) {
+        return false;
+    }
+    long seqId = celix_properties_getAsLong(properties, 
CELIX_EVENT_REMOTE_SEQ_ID, -1L);
+    if (seqId <= 0) {
+        return false;
+    }
+    long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE;
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_event_seq_id_cache_t* seqIdCache = 
celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID);
+    if (seqIdCache == NULL) {
+        celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, 
sizeof(*cache));
+        if (cache == NULL) {
+            celix_logHelper_error(ea->logHelper, "Failed to create event seq 
id cache for %s.", remoteFwUUID);
+            return false;
+        }
+        celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, 
remoteFwUUID, cache);
+        if (status != CELIX_SUCCESS) {
+            celix_logHelper_error(ea->logHelper, "Failed to add event seq id 
cache for %s.", remoteFwUUID);
+            return false;
+        }
+        seqIdCache = celix_steal_ptr(cache);
+    }
+    seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC);
+    if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) {
+        return true;
+    }
+    seqIdCache->seqIdBuffer[seqIdMod] = seqId;
+
+    celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea);
+
+    return false;
+}
+
+static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) {
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    long seqId = ea->nextSeqId++;
+    if (seqId <= 0) {
+        seqId = 1;
+        ea->nextSeqId = seqId + 1;
+    }
+    return seqId;
+}
+
+static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, 
const char* topic, const celix_properties_t* props, bool async) {
+    celix_autoptr(celix_properties_t) remoteProps = 
celix_properties_copy(props);
+    if (remoteProps == NULL) {
+        celix_logHelper_error(ea->logHelper, "Failed to copy remote properties 
for event %s.", topic);
+        return ENOMEM;
+    }
+    celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE);
+    celix_status_t status = celix_properties_set(remoteProps, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_error(ea->logHelper, "Failed to set remote framework 
uuid for event %s.", topic);
+        return status;
+    }
+    long seqId = celix_eventAdmin_getEventSeqId(ea);
+    status = celix_properties_setLong(remoteProps, CELIX_EVENT_REMOTE_SEQ_ID, 
seqId);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_error(ea->logHelper, "Failed to set remote seq id for 
event %s.", topic);
+        return status;
+    }
+    celix_auto(celix_rwlock_rlock_guard_t) rLockGuard = 
celixRwlockRlockGuard_init(&ea->lock);
+    CELIX_LONG_HASH_MAP_ITERATE(ea->remoteProviderServices, iter) {
+        celix_event_remote_provider_service_t* remoteProvider = 
iter.value.ptrValue;
+        if (async) {
+            celix_logHelper_trace(ea->logHelper, "Post event %s to remote 
provider.", topic);
+            status = remoteProvider->postEvent(remoteProvider->handle, topic, 
remoteProps);

Review Comment:
   Note that the `postEvent` and `sendEvent` is called inside a lock. IMO this 
adds a constraint to the remote provider implementations to not call any 
services in the `postEvent` or `sendEvent` , because this could lead to - a 
unintentionally - call to another post/send event which is then forwarded to 
the remote providers -> deadlock. 
   
   Or this should be clearly documented on the remote provider service 
documentation, or the usage of remote providers should be protected with a use 
count. something like (pseudo):
   
   ```
   addRemoteProvider: 
     - create provider entry with a use count of 0
     - lock and add entry to map
   
   removeRemoteProvider:
     - lock 
     - remote provider entry from map
     - unlock
     - wait for use count 0
     - free entry
   
   celix_eventAdmin_deliverEventToRemote:
     - create local array list 
     - lock
     - get entry, increase use count and add entry to local list
     - unlock
     - loop through list and call postEvent or sendEvent
     - lock
     - loop through list and decrease use count
     - unlock
   ```
   
   This ensure that services are called outside a lock, but introduces quite to 
additional complexity. That being said, I would prefer such a solution. 
Especially for remote calls I have encountered quite a few cycling deadlocks.
   
   
   



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{
+    if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
+        celix_string_hash_map_iterator_t iter = 
celix_stringHashMap_begin(ea->eventSeqIdCache);
+        while (!celix_stringHashMapIterator_isEnd(&iter)) {
+            celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
+            if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 
60*60/*1h*/) {
+                celix_stringHashMapIterator_remove(&iter);
+            } else {
+                celix_stringHashMapIterator_next(&iter);
+            }
+        }
+    }
+    return;
+}
+
+static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const 
char* topic CELIX_UNUSED, const celix_properties_t* properties) {
+    const char* remoteFwUUID = celix_properties_get(properties, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL);
+    if (remoteFwUUID == NULL) {
+        return false;
+    }
+    long seqId = celix_properties_getAsLong(properties, 
CELIX_EVENT_REMOTE_SEQ_ID, -1L);
+    if (seqId <= 0) {
+        return false;
+    }
+    long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE;
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_event_seq_id_cache_t* seqIdCache = 
celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID);
+    if (seqIdCache == NULL) {
+        celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, 
sizeof(*cache));
+        if (cache == NULL) {
+            celix_logHelper_error(ea->logHelper, "Failed to create event seq 
id cache for %s.", remoteFwUUID);
+            return false;
+        }
+        celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, 
remoteFwUUID, cache);
+        if (status != CELIX_SUCCESS) {
+            celix_logHelper_error(ea->logHelper, "Failed to add event seq id 
cache for %s.", remoteFwUUID);
+            return false;
+        }
+        seqIdCache = celix_steal_ptr(cache);
+    }
+    seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC);
+    if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) {
+        return true;
+    }
+    seqIdCache->seqIdBuffer[seqIdMod] = seqId;
+
+    celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea);
+
+    return false;
+}
+
+static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) {
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    long seqId = ea->nextSeqId++;
+    if (seqId <= 0) {
+        seqId = 1;
+        ea->nextSeqId = seqId + 1;
+    }
+    return seqId;
+}
+
+static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, 
const char* topic, const celix_properties_t* props, bool async) {
+    celix_autoptr(celix_properties_t) remoteProps = 
celix_properties_copy(props);
+    if (remoteProps == NULL) {
+        celix_logHelper_error(ea->logHelper, "Failed to copy remote properties 
for event %s.", topic);
+        return ENOMEM;
+    }
+    celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE);
+    celix_status_t status = celix_properties_set(remoteProps, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID);

Review Comment:
   👍 I like the design of an opt-in with CELIX_EVENT_REMOTE_ENABLE and the 
"this is a remote event" indication with CELIX_EVENT_REMOTE_FRAMEWORK_UUID.



##########
bundles/event_admin/event_admin_api/include/celix_event_constants.h:
##########
@@ -106,6 +106,47 @@ extern "C" {
  */
 #define CELIX_EVENT_TIMESTAMP "timestamp"
 
+/**
+ * @brief It is a property of event to indicate the event is a remote event. 
The type of the value for this event property is Boolean.
+ *
+ * If the value is true, the event will be delivered to remote
+ * event handlers and local event handlers, otherwise, the event will be only 
delivered to local event handlers.
+ */
+#define CELIX_EVENT_REMOTE_ENABLE "celix.event.remote.enable"
+
+/**
+ * @brief The QoS of the remote event. The type of the value for this event 
property is integer. It indicates the quality of service of the remote event. 
If the value is not set, the remote provider should use a proper default value.
+ *
+ * The value must be one of the following:
+ * - 0: At most once delivery
+ * - 1: At least once delivery
+ * - 2: Exactly once delivery
+ */
+#define CELIX_EVENT_REMOTE_QOS "celix.event.remote.qos"

Review Comment:
   Should this value not impact whether a sendEvent return a success or not? 



##########
bundles/event_admin/event_admin/src/celix_event_admin.c:
##########
@@ -415,6 +454,131 @@ int 
celix_eventAdmin_removeEventHandlerWithProperties(void* handle, void* svc, c
     return CELIX_SUCCESS;
 }
 
+int celix_eventAdmin_addRemoteProviderService(void* handle, void* svc, const 
celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_put(ea->remoteProviderServices, serviceId, svc);
+    return CELIX_SUCCESS;
+}
+
+int celix_eventAdmin_removeRemoteProviderService(void* handle, void* svc, 
const celix_properties_t* props) {
+    assert(handle != NULL);
+    assert(svc != NULL);
+    celix_event_admin_t* ea = handle;
+    long serviceId = celix_properties_getAsLong(props, 
CELIX_FRAMEWORK_SERVICE_ID, -1L);
+    if (serviceId < 0) {
+        celix_logHelper_error(ea->logHelper, "Remote provider service id is 
invalid.");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_longHashMap_remove(ea->remoteProviderServices, serviceId);
+    return CELIX_SUCCESS;
+}
+
+static void 
celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(celix_event_admin_t* ea) 
{
+    if (celix_stringHashMap_size(ea->eventSeqIdCache) > 16) {
+        celix_string_hash_map_iterator_t iter = 
celix_stringHashMap_begin(ea->eventSeqIdCache);
+        while (!celix_stringHashMapIterator_isEnd(&iter)) {
+            celix_event_seq_id_cache_t* cache = iter.value.ptrValue;
+            if (celix_elapsedtime(CLOCK_MONOTONIC, cache->lastModified) > 
60*60/*1h*/) {
+                celix_stringHashMapIterator_remove(&iter);
+            } else {
+                celix_stringHashMapIterator_next(&iter);
+            }
+        }
+    }
+    return;
+}
+
+static bool celix_eventAdmin_isDuplicateEvent(celix_event_admin_t* ea, const 
char* topic CELIX_UNUSED, const celix_properties_t* properties) {
+    const char* remoteFwUUID = celix_properties_get(properties, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, NULL);
+    if (remoteFwUUID == NULL) {
+        return false;
+    }
+    long seqId = celix_properties_getAsLong(properties, 
CELIX_EVENT_REMOTE_SEQ_ID, -1L);
+    if (seqId <= 0) {
+        return false;
+    }
+    long seqIdMod = seqId % CELIX_EVENT_ADMIN_MAX_EVENT_SEQ_ID_CACHE_SIZE;
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    celix_event_seq_id_cache_t* seqIdCache = 
celix_stringHashMap_get(ea->eventSeqIdCache, remoteFwUUID);
+    if (seqIdCache == NULL) {
+        celix_autofree celix_event_seq_id_cache_t* cache = calloc(1, 
sizeof(*cache));
+        if (cache == NULL) {
+            celix_logHelper_error(ea->logHelper, "Failed to create event seq 
id cache for %s.", remoteFwUUID);
+            return false;
+        }
+        celix_status_t status = celix_stringHashMap_put(ea->eventSeqIdCache, 
remoteFwUUID, cache);
+        if (status != CELIX_SUCCESS) {
+            celix_logHelper_error(ea->logHelper, "Failed to add event seq id 
cache for %s.", remoteFwUUID);
+            return false;
+        }
+        seqIdCache = celix_steal_ptr(cache);
+    }
+    seqIdCache->lastModified = celix_gettime(CLOCK_MONOTONIC);
+    if (seqIdCache->seqIdBuffer[seqIdMod] == seqId) {
+        return true;
+    }
+    seqIdCache->seqIdBuffer[seqIdMod] = seqId;
+
+    celix_eventAdmin_retrieveLongTimeUnusedEventSeqIdCache(ea);
+
+    return false;
+}
+
+static long celix_eventAdmin_getEventSeqId(celix_event_admin_t* ea) {
+    celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = 
celixRwlockWlockGuard_init(&ea->lock);
+    long seqId = ea->nextSeqId++;
+    if (seqId <= 0) {
+        seqId = 1;
+        ea->nextSeqId = seqId + 1;
+    }
+    return seqId;
+}
+
+static int celix_eventAdmin_deliverEventToRemote(celix_event_admin_t* ea, 
const char* topic, const celix_properties_t* props, bool async) {
+    celix_autoptr(celix_properties_t) remoteProps = 
celix_properties_copy(props);
+    if (remoteProps == NULL) {
+        celix_logHelper_error(ea->logHelper, "Failed to copy remote properties 
for event %s.", topic);
+        return ENOMEM;
+    }
+    celix_properties_unset(remoteProps, CELIX_EVENT_REMOTE_ENABLE);
+    celix_status_t status = celix_properties_set(remoteProps, 
CELIX_EVENT_REMOTE_FRAMEWORK_UUID, ea->fwUUID);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_error(ea->logHelper, "Failed to set remote framework 
uuid for event %s.", topic);
+        return status;
+    }
+    long seqId = celix_eventAdmin_getEventSeqId(ea);
+    status = celix_properties_setLong(remoteProps, CELIX_EVENT_REMOTE_SEQ_ID, 
seqId);
+    if (status != CELIX_SUCCESS) {
+        celix_logHelper_error(ea->logHelper, "Failed to set remote seq id for 
event %s.", topic);
+        return status;
+    }
+    celix_auto(celix_rwlock_rlock_guard_t) rLockGuard = 
celixRwlockRlockGuard_init(&ea->lock);
+    CELIX_LONG_HASH_MAP_ITERATE(ea->remoteProviderServices, iter) {
+        celix_event_remote_provider_service_t* remoteProvider = 
iter.value.ptrValue;
+        if (async) {
+            celix_logHelper_trace(ea->logHelper, "Post event %s to remote 
provider.", topic);
+            status = remoteProvider->postEvent(remoteProvider->handle, topic, 
remoteProps);
+        } else {
+            celix_logHelper_trace(ea->logHelper, "Send event %s to remote 
provider.", topic);
+            status = remoteProvider->sendEvent(remoteProvider->handle, topic, 
remoteProps);
+        }
+        if (status != CELIX_SUCCESS) {
+            celix_logHelper_error(ea->logHelper, "Failed to deliver event %s 
to remote provider(%ld).", topic, iter.key);

Review Comment:
   Although I think are guidelines could be more clearly on what log levels to 
use:
   
   > For log levels use the following guidelines:
   - trace: Use this level for very detailed that you would only want to have 
while diagnosing problems. 
   - debug: This level should be used for information that might be helpful in 
diagnosing problems or understanding 
     what's going on, but is too verbose to be enabled by default. 
   - info: Use this level for general operational messages that aren't tied to 
any specific problem or error condition. 
     They provide insight into the normal behavior of the system. Examples 
include startup/shutdown messages, 
     configuration assumptions, etc.
   - warning: Use this level to report an issue from which the system can 
recover, but which might indicate a potential 
     problem.
   - error: This level should be used to report issues that need immediate 
attention and might prevent the system from 
     functioning correctly. These are problems that are unexpected and affect 
functionality, but not so severe that the 
     process needs to stop. Examples include runtime errors, inability to 
connect to a service, etc.
   - fatal: Use this level to report severe errors that prevent the program 
from continuing to run. 
     After logging a fatal error, the program will typically terminate.
   
   IMO this should be warning, because the system cannot deliver an event 
remotely. This is potentially a problem, but  system can continue working 
locally. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@celix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to