CELIX-237: Forward remove endpoint call from rsa to import_registration. Removed status check for mutex unlock in top man. This causes deadlocks when an error occurs
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/a477ab96 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/a477ab96 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/a477ab96 Branch: refs/heads/develop Commit: a477ab96cf01896d360b71823e6302b942271840 Parents: 313452d Author: Pepijn Noltes <[email protected]> Authored: Wed Sep 16 12:45:19 2015 +0200 Committer: Pepijn Noltes <[email protected]> Committed: Wed Sep 16 12:45:19 2015 +0200 ---------------------------------------------------------------------- .../private/include/import_registration_dfi.h | 3 +- .../rsa/private/src/import_registration_dfi.c | 40 ++++--- .../rsa/private/src/remote_service_admin_dfi.c | 52 ++-------- .../private/src/topology_manager.c | 104 +++++++++---------- 4 files changed, 86 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h ---------------------------------------------------------------------- diff --git a/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h b/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h index 6f2a232..6aa1b26 100644 --- a/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h +++ b/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h @@ -10,8 +10,7 @@ typedef void (*send_func_type)(void *handle, endpoint_description_pt endpointDescription, char *request, char **reply, int* replyStatus); -celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCallback)(void *, import_registration_pt), - void *rsaHandle, endpoint_description_pt description, const char *classObject, +celix_status_t importRegistration_create(bundle_context_pt context, endpoint_description_pt description, const char *classObject, import_registration_pt *import); void importRegistration_destroy(import_registration_pt import); http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c ---------------------------------------------------------------------- diff --git a/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c b/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c index d192e29..56144d9 100644 --- a/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c +++ b/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c @@ -10,6 +10,9 @@ struct import_registration { bundle_context_pt context; endpoint_description_pt endpoint; //TODO owner? -> free when destroyed const char *classObject; //NOTE owned by endpoint + + celix_thread_mutex_t mutex; //protects send & sendhandle + send_func_type send; void *sendHandle; @@ -17,9 +20,6 @@ struct import_registration { service_registration_pt factoryReg; hash_map_pt proxies; //key -> bundle, value -> service_proxy - - void (*rsaCloseImportCallback)(void *, import_registration_pt); - void *rsaHandle; }; struct service_proxy { @@ -33,8 +33,7 @@ static celix_status_t importRegistration_createProxy(import_registration_pt impo static void importRegistration_proxyFunc(void *userData, void *args[], void *returnVal); static void importRegistration_destroyProxy(struct service_proxy *proxy); -celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCallback)(void *, import_registration_pt), - void *rsaHandle, endpoint_description_pt endpoint, const char *classObject, +celix_status_t importRegistration_create(bundle_context_pt context, endpoint_description_pt endpoint, const char *classObject, import_registration_pt *out) { celix_status_t status = CELIX_SUCCESS; import_registration_pt reg = calloc(1, sizeof(*reg)); @@ -45,11 +44,10 @@ celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCa if (reg != NULL && reg->factory != NULL) { reg->context = context; - reg->rsaCloseImportCallback = rsaCallback; - reg->rsaHandle = rsaHandle; reg->endpoint = endpoint; reg->classObject = classObject; reg->proxies = hashMap_create(NULL, NULL, NULL, NULL); + celixThreadMutex_create(®->mutex, NULL); reg->factory->factory = reg; reg->factory->getService = (void *)importRegistration_getService; @@ -70,8 +68,10 @@ celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCa celix_status_t importRegistration_setSendFn(import_registration_pt reg, send_func_type send, void *handle) { + celixThreadMutex_lock(®->mutex); reg->send = send; reg->sendHandle = handle; + celixThreadMutex_unlock(®->mutex); return CELIX_SUCCESS; } @@ -79,7 +79,12 @@ celix_status_t importRegistration_setSendFn(import_registration_pt reg, void importRegistration_destroy(import_registration_pt import) { if (import != NULL) { if (import->proxies != NULL) { - //TODO destroy proxies + hash_map_iterator_pt iter = hashMapIterator_create(import->proxies); + while (hashMapIterator_hasNext(iter)) { + struct service_proxy *proxy = hashMapIterator_nextEntry(iter); + importRegistration_destroyProxy(proxy); + } + hashMapIterator_destroy(iter); hashMap_destroy(import->proxies, false, false); import->proxies = NULL; } @@ -121,6 +126,7 @@ celix_status_t importRegistration_getService(import_registration_pt import, bund printf("getting service for bundle '%s'\n", name); */ + struct service_proxy *proxy = hashMap_get(import->proxies, bundle); //TODO lock if (proxy == NULL) { status = importRegistration_createProxy(import, bundle, &proxy); @@ -207,13 +213,8 @@ static celix_status_t importRegistration_createProxy(import_registration_pt impo dynInterface_destroy(proxy->intf); proxy->intf = NULL; } - if (proxy->service != NULL) { - free(proxy->service); - proxy->service = NULL; - } - if (proxy != NULL) { - free(proxy); - } + free(proxy->service); + free(proxy); } return status; @@ -235,11 +236,16 @@ static void importRegistration_proxyFunc(void *userData, void *args[], void *ret //printf("Need to send following json '%s'\n", invokeRequest); } + if (status == CELIX_SUCCESS) { char *reply = NULL; int rc = 0; //printf("sending request\n"); - import->send(import->sendHandle, import->endpoint, invokeRequest, &reply, &rc); + celixThreadMutex_lock(&import->mutex); + if (import->send != NULL) { + import->send(import->sendHandle, import->endpoint, invokeRequest, &reply, &rc); + } + celixThreadMutex_unlock(&import->mutex); //printf("request sended. got reply '%s' with status %i\n", reply, rc); if (rc == 0) { @@ -295,7 +301,7 @@ static void importRegistration_destroyProxy(struct service_proxy *proxy) { celix_status_t importRegistration_close(import_registration_pt registration) { celix_status_t status = CELIX_SUCCESS; - registration->rsaCloseImportCallback(registration->rsaHandle, registration); + importRegistration_stop(registration); return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c ---------------------------------------------------------------------- diff --git a/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c b/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c index 8eadbdb..c12d0aa 100644 --- a/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c +++ b/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c @@ -173,7 +173,7 @@ celix_status_t remoteServiceAdmin_create(bundle_context_pt context, remote_servi do { char newPort[10]; - const char *options[] = { "listening_ports", port, NULL}; + const char *options[] = { "listening_ports", port, "num_threads", "5", NULL}; (*admin)->ctx = mg_start(&callbacks, (*admin), options); @@ -581,7 +581,7 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt admin, e import_registration_pt import = NULL; if (objectClass != NULL) { - status = importRegistration_create(admin->context, NULL, NULL, endpointDescription, objectClass, &import); + status = importRegistration_create(admin->context, endpointDescription, objectClass, &import); } if (status == CELIX_SUCCESS) { importRegistration_setSendFn(import, remoteServiceAdmin_send, admin); @@ -591,9 +591,9 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt admin, e status = importRegistration_start(import); } - //celixThreadMutex_lock(&admin->importedServicesLock); - //TODO add to list - //celixThreadMutex_unlock(&admin->importedServicesLock); + celixThreadMutex_lock(&admin->importedServicesLock); + hashMap_put(admin->importedServices, import, import); + celixThreadMutex_unlock(&admin->importedServicesLock); if (status == CELIX_SUCCESS) { *out = import; @@ -605,44 +605,14 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt admin, e celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt admin, import_registration_pt registration) { celix_status_t status = CELIX_SUCCESS; - return status; - /* - - endpoint_description_pt endpointDescription = (endpoint_description_pt) registration->endpointDescription; - import_registration_factory_pt registration_factory = NULL; - - celixThreadMutex_lock(&admin->importedServicesLock); - - registration_factory = (import_registration_factory_pt) hashMap_get(admin->importedServices, endpointDescription->service); - - // factory available - if ((registration_factory == NULL) || (registration_factory->trackedFactory == NULL)) - { - logHelper_log(admin->loghelper, OSGI_LOGSERVICE_ERROR, "RSA: Error while retrieving registration factory for imported service %s", endpointDescription->service); - } - else - { - registration_factory->trackedFactory->unregisterProxyService(registration_factory->trackedFactory->factory, endpointDescription); - arrayList_removeElement(registration_factory->registrations, registration); - importRegistration_destroy(registration); - if (arrayList_isEmpty(registration_factory->registrations)) - { - logHelper_log(admin->loghelper, OSGI_LOGSERVICE_INFO, "RSA: closing proxy."); - - serviceTracker_close(registration_factory->proxyFactoryTracker); - importRegistrationFactory_close(registration_factory); - - hashMap_remove(admin->importedServices, endpointDescription->service); - - importRegistrationFactory_destroy(®istration_factory); - } - } - - celixThreadMutex_unlock(&admin->importedServicesLock); + celixThreadMutex_lock(&admin->importedServicesLock); + importRegistration_close(registration); + //importRegistration_destroy(registration); TODO enable & debug -> segfault + hashMap_remove(admin->importedServices, registration); + celixThreadMutex_unlock(&admin->importedServicesLock); - return status; - */ + return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/topology_manager/private/src/topology_manager.c ---------------------------------------------------------------------- diff --git a/remote_services/topology_manager/private/src/topology_manager.c b/remote_services/topology_manager/private/src/topology_manager.c index 3b9b62f..f78bde5 100644 --- a/remote_services/topology_manager/private/src/topology_manager.c +++ b/remote_services/topology_manager/private/src/topology_manager.c @@ -624,60 +624,59 @@ celix_status_t topologyManager_endpointListenerRemoved(void * handle, service_re celix_status_t status; topology_manager_pt manager = handle; - status = celixThreadMutex_lock(&manager->listenerListLock); - - if (status == CELIX_SUCCESS) { - if (hashMap_remove(manager->listenerList, reference)) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed"); - } + celixThreadMutex_lock(&manager->listenerListLock); - status = celixThreadMutex_unlock(&manager->listenerListLock); + if (hashMap_remove(manager->listenerList, reference)) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed"); } + + celixThreadMutex_unlock(&manager->listenerListLock); + + return status; } celix_status_t topologyManager_notifyListenersEndpointAdded(topology_manager_pt manager, remote_service_admin_service_pt rsa, array_list_pt registrations) { celix_status_t status; - status = celixThreadMutex_lock(&manager->listenerListLock); + celixThreadMutex_lock(&manager->listenerListLock); - if (status == CELIX_SUCCESS) { - hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); - while (hashMapIterator_hasNext(iter)) { - char *scope = NULL; - endpoint_listener_pt epl = NULL; - service_reference_pt reference = hashMapIterator_nextKey(iter); - serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope); + hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); + while (hashMapIterator_hasNext(iter)) { + char *scope = NULL; + endpoint_listener_pt epl = NULL; + service_reference_pt reference = hashMapIterator_nextKey(iter); - status = bundleContext_getService(manager->context, reference, (void **) &epl); - if (status == CELIX_SUCCESS) { - filter_pt filter = filter_create(scope); - - int regSize = arrayList_size(registrations); - for (int regIt = 0; regIt < regSize; regIt++) { - export_registration_pt export = arrayList_get(registrations, regIt); - - endpoint_description_pt endpoint = NULL; - status = topologyManager_getEndpointDescriptionForExportRegistration(rsa, export, &endpoint); - if (status == CELIX_SUCCESS) { - bool matchResult = false; - filter_match(filter, endpoint->properties, &matchResult); - if (matchResult) { - status = epl->endpointAdded(epl->handle, endpoint, scope); - } + serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope); + + status = bundleContext_getService(manager->context, reference, (void **) &epl); + if (status == CELIX_SUCCESS) { + filter_pt filter = filter_create(scope); + + int regSize = arrayList_size(registrations); + for (int regIt = 0; regIt < regSize; regIt++) { + export_registration_pt export = arrayList_get(registrations, regIt); + + endpoint_description_pt endpoint = NULL; + status = topologyManager_getEndpointDescriptionForExportRegistration(rsa, export, &endpoint); + if (status == CELIX_SUCCESS) { + bool matchResult = false; + filter_match(filter, endpoint->properties, &matchResult); + if (matchResult) { + status = epl->endpointAdded(epl->handle, endpoint, scope); } } - - filter_destroy(filter); } + + filter_destroy(filter); } + } - hashMapIterator_destroy(iter); + hashMapIterator_destroy(iter); - status = celixThreadMutex_unlock(&manager->listenerListLock); - } + celixThreadMutex_unlock(&manager->listenerListLock); return status; } @@ -686,35 +685,34 @@ celix_status_t topologyManager_notifyListenersEndpointRemoved(topology_manager_p celix_status_t status; - status = celixThreadMutex_lock(&manager->listenerListLock); + celixThreadMutex_lock(&manager->listenerListLock); - if (status == CELIX_SUCCESS) { - hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); - while (hashMapIterator_hasNext(iter)) { - endpoint_description_pt endpoint = NULL; - endpoint_listener_pt epl = NULL; - char *scope = NULL; + hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); + while (hashMapIterator_hasNext(iter)) { + endpoint_description_pt endpoint = NULL; + endpoint_listener_pt epl = NULL; + char *scope = NULL; - service_reference_pt reference = hashMapIterator_nextKey(iter); - serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope); + service_reference_pt reference = hashMapIterator_nextKey(iter); + serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope); - status = bundleContext_getService(manager->context, reference, (void **) &epl); + status = bundleContext_getService(manager->context, reference, (void **) &epl); - if (status == CELIX_SUCCESS) { - status = topologyManager_getEndpointDescriptionForExportRegistration(rsa, export, &endpoint); - } + if (status == CELIX_SUCCESS) { + status = topologyManager_getEndpointDescriptionForExportRegistration(rsa, export, &endpoint); + } - if (status == CELIX_SUCCESS) { - status = epl->endpointRemoved(epl->handle, endpoint, NULL); - } + if (status == CELIX_SUCCESS) { + status = epl->endpointRemoved(epl->handle, endpoint, NULL); } hashMapIterator_destroy(iter); - status = celixThreadMutex_unlock(&manager->listenerListLock); } + celixThreadMutex_unlock(&manager->listenerListLock); + return status; }
