This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch hotfix/rsa_deadlock in repository https://gitbox.apache.org/repos/asf/celix.git
commit 5bf1426256ea53a6cc68c9a861c37cb62682d08c Author: Pepijn Noltes <[email protected]> AuthorDate: Tue Sep 8 20:49:12 2020 +0200 Adds a stop service export thread for rsa to prevent deadlocks --- .../src/export_registration_dfi.c | 24 +++--- .../src/export_registration_dfi.h | 2 +- .../src/remote_service_admin_dfi.c | 94 +++++++++++++++++++++- 3 files changed, 104 insertions(+), 16 deletions(-) diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c index 39b43b3..5afa1a3 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c +++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c @@ -19,7 +19,6 @@ #include <jansson.h> #include <dyn_interface.h> -#include <json_serializer.h> #include <remote_constants.h> #include <remote_service_admin.h> #include <service_tracker_customizer.h> @@ -44,6 +43,7 @@ struct export_registration { celix_thread_mutex_t mutex; + bool active; //protected by mutex void *service; //protected by mutex long trackerId; //protected by mutex @@ -86,6 +86,7 @@ celix_status_t exportRegistration_create(celix_log_helper_t *helper, service_ref reg->logFile = logFile; reg->servId = strndup(servId, 1024); reg->trackerId = -1L; + reg->active = true; remoteInterceptorsHandler_create(context, ®->interceptorsHandler); @@ -139,8 +140,11 @@ celix_status_t exportRegistration_call(export_registration_t *export, char *data bool cont = remoteInterceptorHandler_invokePreExportCall(export->interceptorsHandler, export->exportReference.endpoint->properties, sig, &metadata); if (cont) { celixThreadMutex_lock(&export->mutex); - if (export->service != NULL) { + if (export->active && export->service != NULL) { status = jsonRpc_call(export->intf, export->service, data, responseOut); + } else if (!export->active) { + status = CELIX_ILLEGAL_STATE; + celix_logHelper_warning(export->helper, "Cannot call an inactive service export"); } else { status = CELIX_ILLEGAL_STATE; celix_logHelper_error(export->helper, "export service pointer is NULL"); @@ -245,7 +249,7 @@ celix_status_t exportRegistration_start(export_registration_t *reg) { celixThreadMutex_unlock(®->mutex); if (prevTrkId >= 0) { - celix_logHelper_error(reg->helper, "Error staring export registration. The export registration already has an active service tracker"); + celix_logHelper_error(reg->helper, "Error starting export registration. The export registration already had an active service tracker"); celix_bundleContext_stopTracker(reg->context, prevTrkId); } @@ -268,6 +272,12 @@ celix_status_t exportRegistration_stop(export_registration_t *reg) { return status; } +void exportRegistration_setActive(export_registration_t *reg, bool active) { + celixThreadMutex_lock(®->mutex); + reg->active = active; + celixThreadMutex_unlock(®->mutex); +} + static void exportRegistration_addServ(void *data, void *service) { export_registration_t *reg = data; celixThreadMutex_lock(®->mutex); @@ -284,14 +294,6 @@ static void exportRegistration_removeServ(void *data, void *service) { celixThreadMutex_unlock(®->mutex); } - -celix_status_t exportRegistration_close(export_registration_t *reg) { - celix_status_t status = CELIX_SUCCESS; - exportRegistration_stop(reg); - return status; -} - - celix_status_t exportRegistration_getException(export_registration_t *registration) { celix_status_t status = CELIX_SUCCESS; //TODO diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h index 2333c4c..8bc40eb 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h +++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h @@ -26,11 +26,11 @@ #include "endpoint_description.h" celix_status_t exportRegistration_create(celix_log_helper_t *helper, service_reference_pt reference, endpoint_description_t *endpoint, celix_bundle_context_t *context, FILE *logFile, export_registration_t **registration); -celix_status_t exportRegistration_close(export_registration_t *registration); void exportRegistration_destroy(export_registration_t *registration); celix_status_t exportRegistration_start(export_registration_t *registration); celix_status_t exportRegistration_stop(export_registration_t *registration); +void exportRegistration_setActive(export_registration_t *registration, bool active); celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength, celix_properties_t *metadata, char **response, int *responseLength); diff --git a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c index f690a31..53620df 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c +++ b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <stdlib.h> +#include <unistd.h> #include <arpa/inet.h> #include <netdb.h> @@ -56,6 +57,16 @@ #define RSA_LOG_DEBUG(admin, msg, ...) \ celix_logHelper_log((admin)->loghelper, CELIX_LOG_LEVEL_ERROR, (msg), ##__VA_ARGS__) + +/** + * If set to true the rsa will create a thread to handle stopping of service export. + * + * This stop thread can be removed when the branch feature/async_svc_registration is merged and + * celix_bundleContext_stopTrackerAsync is available. + * + */ +#define CELIX_RSA_USE_STOP_EXPORT_THREAD false + struct remote_service_admin { celix_bundle_context_t *context; celix_log_helper_t *loghelper; @@ -63,6 +74,13 @@ struct remote_service_admin { celix_thread_rwlock_t exportedServicesLock; hash_map_pt exportedServices; + //NOTE stopExportsMutex, stopExports, stopExportsActive, stopExportsCond and stopExportsThread are only used if CELIX_RSA_USE_STOP_EXPORT_THREAD is set to true + celix_thread_mutex_t stopExportsMutex; + celix_array_list_t *stopExports; + bool stopExportsActive; + celix_thread_cond_t stopExportsCond; + celix_thread_t stopExportsThread; + celix_thread_mutex_t importedServicesLock; array_list_pt importedServices; @@ -110,6 +128,8 @@ static celix_status_t remoteServiceAdmin_getIpAddress(char* interface, char** ip static size_t remoteServiceAdmin_readCallback(void *ptr, size_t size, size_t nmemb, void *userp); static size_t remoteServiceAdmin_write(void *contents, size_t size, size_t nmemb, void *userp); static void remoteServiceAdmin_log(remote_service_admin_t *admin, int level, const char *file, int line, const char *msg, ...); +static void remoteServiceAdmin_setupStopExportsThread(remote_service_admin_t* admin); +static void remoteServiceAdmin_teardownStopExportsThread(remote_service_admin_t* admin); static void remoteServiceAdmin_curlshare_lock(CURL *handle, curl_lock_data data, curl_lock_access laccess, void *userptr) { @@ -220,6 +240,8 @@ celix_status_t remoteServiceAdmin_create(celix_bundle_context_t *context, remote status = EPERM; } + remoteServiceAdmin_setupStopExportsThread(*admin); + // Prepare callbacks structure. We have only one callback, the rest are NULL. struct mg_callbacks callbacks; memset(&callbacks, 0, sizeof(callbacks)); @@ -285,6 +307,52 @@ celix_status_t remoteServiceAdmin_destroy(remote_service_admin_t **admin) return status; } +void* remoteServiceAdmin_stopExportsThread(void *data) { + remote_service_admin_t* admin = data; + bool active = true; + + while (active) { + celixThreadMutex_lock(&admin->stopExportsMutex); + if (admin->stopExportsActive && celix_arrayList_size(admin->stopExports) == 0) { + celixThreadCondition_timedwaitRelative(&admin->stopExportsCond, &admin->stopExportsMutex, 1, 0); + } + for (int i = 0; i < celix_arrayList_size(admin->stopExports); ++i) { + export_registration_t *export = celix_arrayList_get(admin->stopExports, i); + exportRegistration_stop(export); + exportRegistration_destroy(export); + } + celix_arrayList_clear(admin->stopExports); + active = admin->stopExportsActive; + celixThreadMutex_unlock(&admin->stopExportsMutex); + } + + return NULL; +} + +static void remoteServiceAdmin_setupStopExportsThread(remote_service_admin_t* admin) { + if (CELIX_RSA_USE_STOP_EXPORT_THREAD) { + //setup exports stop thread + celixThreadMutex_create(&admin->stopExportsMutex, NULL); + admin->stopExports = celix_arrayList_create(); + celixThreadCondition_init(&admin->stopExportsCond, NULL); + admin->stopExportsActive = true; + celixThread_create(&admin->stopExportsThread, NULL, remoteServiceAdmin_stopExportsThread, admin); + } +} + +static void remoteServiceAdmin_teardownStopExportsThread(remote_service_admin_t* admin) { + if (CELIX_RSA_USE_STOP_EXPORT_THREAD) { + celixThreadMutex_lock(&admin->stopExportsMutex); + admin->stopExportsActive = false; + celixThreadCondition_broadcast(&admin->stopExportsCond); + celixThreadMutex_unlock(&admin->stopExportsMutex); + celixThread_join(admin->stopExportsThread, NULL); + celix_arrayList_destroy(admin->stopExports); + celixThreadMutex_destroy(&admin->stopExportsMutex); + celixThreadCondition_destroy(&admin->stopExportsCond); + } +} + celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin) { celix_status_t status = CELIX_SUCCESS; @@ -298,8 +366,16 @@ celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin) { for (i = 0; i < arrayList_size(exports); i++) { export_registration_t *export = arrayList_get(exports, i); if (export != NULL) { - exportRegistration_stop(export); - exportRegistration_destroy(export); + if (CELIX_RSA_USE_STOP_EXPORT_THREAD) { + celixThreadMutex_lock(&admin->stopExportsMutex); + exportRegistration_setActive(export, false); + celix_arrayList_add(admin->stopExports, export); + celixThreadCondition_broadcast(&admin->stopExportsCond); + celixThreadMutex_unlock(&admin->stopExportsMutex); + } else { + exportRegistration_stop(export); + exportRegistration_destroy(export); + } } } arrayList_destroy(exports); @@ -307,6 +383,8 @@ celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin) { hashMapIterator_destroy(iter); celixThreadRwlock_unlock(&admin->exportedServicesLock); + remoteServiceAdmin_teardownStopExportsThread(admin); + celixThreadMutex_lock(&admin->importedServicesLock); int i; int size = arrayList_size(admin->importedServices); @@ -555,8 +633,16 @@ celix_status_t remoteServiceAdmin_removeExportedService(remote_service_admin_t * arrayList_destroy(exports); } - exportRegistration_close(registration); - exportRegistration_destroy(registration); + if (CELIX_RSA_USE_STOP_EXPORT_THREAD) { + celixThreadMutex_lock(&admin->stopExportsMutex); + exportRegistration_setActive(registration, false); + celix_arrayList_add(admin->stopExports, registration); + celixThreadCondition_broadcast(&admin->stopExportsCond); + celixThreadMutex_unlock(&admin->stopExportsMutex); + } else { + exportRegistration_stop(registration); + exportRegistration_destroy(registration); + } celixThreadRwlock_unlock(&admin->exportedServicesLock);
