abroekhuis commented on a change in pull request #165: Feature/framework 
racecondition
URL: https://github.com/apache/celix/pull/165#discussion_r390855760
 
 

 ##########
 File path: libs/framework/src/service_registry.c
 ##########
 @@ -972,4 +1032,249 @@ bool celix_serviceRegistry_getServiceInfo(
     celixThreadRwlock_unlock(&registry->lock);
 
     return found;
-}
\ No newline at end of file
+}
+
+celix_status_t 
celix_serviceRegistry_addServiceListener(celix_service_registry_t *registry, 
celix_bundle_t *bundle, const char *stringFilter, celix_service_listener_t 
*listener) {
+
+    celix_filter_t *filter = NULL;
+    if (stringFilter != NULL) {
+        filter = celix_filter_create(stringFilter);
+        if (filter == NULL) {
+            fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot add service 
listener filter '%s' is invalid", stringFilter);
+            return CELIX_ILLEGAL_ARGUMENT;
+        }
+    }
+
+    celix_service_registry_service_listener_entry_t *entry = calloc(1, 
sizeof(*entry));
+    entry->bundle = bundle;
+    entry->filter = filter;
+    entry->listener = listener;
+    entry->useCount = 1; //new entry -> count on 1
+    celixThreadMutex_create(&entry->mutex, NULL);
+    celixThreadCondition_init(&entry->cond, NULL);
+
+    celix_array_list_t *registrations =  celix_arrayList_create();
+
+    celixThreadRwlock_writeLock(&registry->lock);
+    celix_arrayList_add(registry->serviceListeners, entry); //use count 1
+
+    //find already registered services
+    hash_map_iterator_t iter = 
hashMapIterator_construct(registry->serviceRegistrations);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_array_list_t *regs = (array_list_pt) 
hashMapIterator_nextValue(&iter);
+        for (int regIdx = 0; (regs != NULL) && regIdx < 
celix_arrayList_size(regs); ++regIdx) {
+            service_registration_pt registration = celix_arrayList_get(regs, 
regIdx);
+            properties_pt props = NULL;
+            serviceRegistration_getProperties(registration, &props);
+            if (celix_filter_match(filter, props)) {
+                serviceRegistration_retain(registration);
+                long svcId = serviceRegistration_getServiceId(registration);
+                celix_arrayList_add(registrations, registration);
+                //update pending register event count
+                celix_increasePendingRegisteredEvent(registry, svcId);
+            }
+        }
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    //NOTE there is a race condition with 
serviceRegistry_registerServiceInternal, as result
+    //a REGISTERED event can be triggered twice instead of once. The service 
tracker can deal with this.
+    //The handling of pending registered events is to ensure that the 
UNREGISTERING event is always
+    //after the 1 or 2 REGISTERED events.
+
+    for (int i = 0; i < celix_arrayList_size(registrations); ++i) {
+        service_registration_pt reg = celix_arrayList_get(registrations, i);
+        long svcId = serviceRegistration_getServiceId(reg);
+        service_reference_pt ref = NULL;
+        serviceRegistry_getServiceReference_internal(registry, bundle, reg, 
&ref);
+        celix_service_event_t event;
+        event.reference = ref;
+        event.type = OSGI_FRAMEWORK_SERVICE_EVENT_REGISTERED;
+        listener->serviceChanged(listener->handle, &event);
+        serviceReference_release(ref, NULL);
+        serviceRegistration_release(reg);
+
+        //update pending register event count
+        celix_decreasePendingRegisteredEvent(registry, svcId);
+    }
+    celix_arrayList_destroy(registrations);
+
+    serviceRegistry_callHooksForListenerFilter(registry, bundle, 
entry->filter, false);
+
+    celix_decreaseCountServiceListener(entry); //use count decreased, can be 0
+    return CELIX_SUCCESS;
+}
+
+celix_status_t 
celix_serviceRegistry_removeServiceListener(celix_service_registry_t *registry, 
celix_service_listener_t *listener) {
+    celix_service_registry_service_listener_entry_t *entry = NULL;
+
+    celixThreadRwlock_writeLock(&registry->lock);
+    for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) 
{
+        celix_service_registry_service_listener_entry_t *visit = 
celix_arrayList_get(registry->serviceListeners, i);
+        if (visit->listener == listener) {
+            entry = visit;
+            celix_arrayList_removeAt(registry->serviceListeners, i);
+            break;
+        }
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    if (entry != NULL) {
+        serviceRegistry_callHooksForListenerFilter(registry, entry->bundle, 
entry->filter, true);
+        celix_waitAndDestroyServiceListener(entry);
+    } else {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot remove service 
listener, listener not found");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    return CELIX_SUCCESS;
+}
+
+static void celix_serviceRegistry_serviceChanged(celix_service_registry_t 
*registry, celix_service_event_type_t eventType, service_registration_pt 
registration) {
+    celix_service_registry_service_listener_entry_t *entry;
+
+    celix_array_list_t* retainedEntries = celix_arrayList_create();
+    celix_array_list_t* matchedEntries = celix_arrayList_create();
+
+    celixThreadRwlock_readLock(&registry->lock);
+    for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) 
{
+        entry = celix_arrayList_get(registry->serviceListeners, i);
+        celix_arrayList_add(retainedEntries, entry);
+        celix_increaseCountServiceListener(entry); //ensure that use count > 
0, so that the listener cannot be destroyed until all pending event are handled.
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    for (int i = 0; i < celix_arrayList_size(retainedEntries); ++i) {
+        entry = celix_arrayList_get(retainedEntries, i);
+        int matched = 0;
+        celix_properties_t *props = NULL;
+        bool matchResult = false;
+        serviceRegistration_getProperties(registration, &props);
+        if (entry->filter != NULL) {
+            filter_match(entry->filter, props, &matchResult);
+        }
+        matched = (entry->filter == NULL) || matchResult;
+        if (matched) {
+            celix_arrayList_add(matchedEntries, entry);
+        } else {
+            celix_decreaseCountServiceListener(entry); //Not a match -> 
release entry
+        }
+    }
+    celix_arrayList_destroy(retainedEntries);
+
+    /*
+     * TODO FIXME, A deadlock can happen when (e.g.) a service is 
deregistered, triggering this fw_serviceChanged and
+     * one of the matching service listener callbacks tries to remove an other 
matched service listener.
+     * The remove service listener will call the listener_waitForDestroy and 
the fw_serviceChanged part keeps the
+     * usageCount on > 0.
+     *
+     * Not sure how to prevent/handle this.
+     */
+
+    for (int i = 0; i < celix_arrayList_size(matchedEntries); ++i) {
+        entry = celix_arrayList_get(matchedEntries, i);
+        service_reference_pt reference = NULL;
+        celix_service_event_t event;
+        serviceRegistry_getServiceReference(registry, entry->bundle, 
registration, &reference);
+        event.type = eventType;
+        event.reference = reference;
+        entry->listener->serviceChanged(entry->listener->handle, &event);
+        serviceRegistry_ungetServiceReference(registry, entry->bundle, 
reference);
+        celix_decreaseCountServiceListener(entry); //decrease usage, so that 
the listener can be destroyed (if use count is now 0)
+    }
+    celix_arrayList_destroy(matchedEntries);
+}
+
+
+static void celix_increasePendingRegisteredEvent(celix_service_registry_t 
*registry, long svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, 
(void*)svcId);
+    count += 1;
+    hashMap_put(registry->pendingRegisterEvents.map, (void*)svcId, 
(void*)count);
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+static void celix_decreasePendingRegisteredEvent(celix_service_registry_t 
*registry, long svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, 
(void*)svcId);
+    assert(count >= 1);
+    count -= 1;
+    if (count > 0) {
+        hashMap_put(registry->pendingRegisterEvents.map, (void *)svcId, (void 
*)count);
+    } else {
+        hashMap_remove(registry->pendingRegisterEvents.map, (void*)svcId);
+    }
+    celixThreadCondition_signal(&registry->pendingRegisterEvents.cond);
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+static void celix_waitForPendingRegisteredEvents(celix_service_registry_t 
*registry, long svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, 
(void*)svcId);
+    while (count > 0) {
+        celixThreadCondition_wait(&registry->pendingRegisterEvents.cond, 
&registry->pendingRegisterEvents.mutex);
+        count = (long)hashMap_get(registry->pendingRegisterEvents.map, 
(void*)svcId);
+    }
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+//static void 
celix_serviceRegistry_triggerListenerHooks(celix_service_registry_t *registry, 
const char* serviceName, const celix_properties_t *properties) {
 
 Review comment:
   Commented out code?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to