This is an automated email from the ASF dual-hosted git repository. abroekhuis pushed a commit to branch feature/rsa_interceptors in repository https://gitbox.apache.org/repos/asf/celix.git
commit f82269549a698af610bc7d5f938c6d4bdf1b4856 Author: Alexander Broekhuis <[email protected]> AuthorDate: Mon Apr 6 19:47:58 2020 +0200 Added interceptors to RSA, currently only DFI is updated. --- CMakeLists.txt | 4 +- bundles/remote_services/examples/CMakeLists.txt | 4 + .../interceptors}/CMakeLists.txt | 20 ++- .../include/first_interceptor_private.h | 41 +++++ .../include/second_interceptor_private.h | 37 +++++ .../examples/interceptors/src/first_interceptor.c | 69 ++++++++ .../interceptors/src/rs_interceptor_activator.c | 94 +++++++++++ .../examples/interceptors/src/second_interceptor.c | 60 +++++++ .../src/export_registration_dfi.c | 48 ++++-- .../src/export_registration_dfi.h | 2 +- .../src/import_registration_dfi.c | 36 +++-- .../src/import_registration_dfi.h | 2 +- .../src/remote_service_admin_dfi.c | 35 ++++- bundles/remote_services/rsa_common/CMakeLists.txt | 2 +- .../rsa_common/src/remote_interceptors_handler.c | 174 +++++++++++++++++++++ .../rsa_spi/include/remote_interceptor.h | 37 +++++ .../rsa_spi/include/remote_interceptors_handler.h | 40 +++++ 17 files changed, 662 insertions(+), 43 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 16013ca..665eb92 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -140,8 +140,8 @@ install(EXPORT celix NAMESPACE Celix:: DESTINATION share/celix/cmake FILE Target install_celix_targets(celix NAMESPACE Celix:: DESTINATION share/celix/cmake FILE CelixTargets.cmake COMPONENT cmake) #install celix cmake modules -install(DIRECTORY ${CMAKE_SOURCE_DIR}/cmake/Modules/ DESTINATION share/celix/cmake/Modules) -install(DIRECTORY ${CMAKE_SOURCE_DIR}/cmake/cmake_celix/ DESTINATION share/celix/cmake/cmake_celix) +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/ DESTINATION share/celix/cmake/Modules) +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cmake/cmake_celix/ DESTINATION share/celix/cmake/cmake_celix) #configure and install CelixConfig and CelixConfigVersion files configure_file(cmake/CelixConfigVersion.cmake.in diff --git a/bundles/remote_services/examples/CMakeLists.txt b/bundles/remote_services/examples/CMakeLists.txt index 17dcabf..8f1b029 100644 --- a/bundles/remote_services/examples/CMakeLists.txt +++ b/bundles/remote_services/examples/CMakeLists.txt @@ -24,6 +24,8 @@ if (RSA_EXAMPLES) add_subdirectory(remote_example_api) add_subdirectory(remote_example_service) + add_subdirectory(interceptors) + # TODO refactor shm remote service admin to use dfi # if (BUILD_RSA_REMOTE_SERVICE_ADMIN_SHM AND BUILD_RSA_DISCOVERY_SHM) @@ -59,6 +61,7 @@ if (RSA_EXAMPLES) Celix::shell_tui Celix::log_service Celix::log_writer_stdout + celix_remote_interceptors_example calculator PROPERTIES RSA_PORT=18888 @@ -74,6 +77,7 @@ if (RSA_EXAMPLES) Celix::log_service Celix::log_writer_stdout Celix::rsa_discovery_etcd + celix_remote_interceptors_example calculator_shell PROPERTIES RSA_PORT=28888 diff --git a/bundles/remote_services/rsa_common/CMakeLists.txt b/bundles/remote_services/examples/interceptors/CMakeLists.txt similarity index 59% copy from bundles/remote_services/rsa_common/CMakeLists.txt copy to bundles/remote_services/examples/interceptors/CMakeLists.txt index 397fa8c..0b9cb35 100644 --- a/bundles/remote_services/rsa_common/CMakeLists.txt +++ b/bundles/remote_services/examples/interceptors/CMakeLists.txt @@ -15,16 +15,14 @@ # specific language governing permissions and limitations # under the License. -add_library(rsa_common STATIC - src/endpoint_description.c - src/export_registration_impl.c - src/import_registration_impl.c +add_celix_bundle(celix_remote_interceptors_example + SYMBOLIC_NAME "celix_remote_interceptors_example" + VERSION "1.0.0" + SOURCES + src/rs_interceptor_activator.c + src/first_interceptor.c + src/second_interceptor.c ) -set_target_properties(rsa_common PROPERTIES OUTPUT_NAME "celix_rsa_common") -target_include_directories(rsa_common PRIVATE src) -target_link_libraries(rsa_common PUBLIC Celix::framework Celix::rsa_spi Celix::log_helper) -#install(TARGETS rsa_common EXPORT celix COMPONENT rsa DESTINATION ${CMAKE_INSTALL_LIBDIR}) - -#Setup target aliases to match external usage -add_library(Celix::rsa_common ALIAS rsa_common) \ No newline at end of file +target_link_libraries(celix_remote_interceptors_example PRIVATE Celix::framework Celix::rsa_common) +target_include_directories(celix_remote_interceptors_example PRIVATE include) \ No newline at end of file diff --git a/bundles/remote_services/examples/interceptors/include/first_interceptor_private.h b/bundles/remote_services/examples/interceptors/include/first_interceptor_private.h new file mode 100644 index 0000000..0b34fc5 --- /dev/null +++ b/bundles/remote_services/examples/interceptors/include/first_interceptor_private.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CELIX_FIRST_INTERCEPTOR_PRIVATE_H +#define CELIX_FIRST_INTERCEPTOR_PRIVATE_H + +#include <stdint.h> + +#include "remote_interceptor.h" + +typedef struct first_interceptor { + uint64_t sequenceNumber; + +} first_interceptor_t; + +static const char *const SEQUENCE_NUMBER = "sequence.number"; + +celix_status_t firstInterceptor_create(first_interceptor_t **interceptor); +celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor); + +bool firstInterceptor_preExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +void firstInterceptor_postExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +bool firstInterceptor_preProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +void firstInterceptor_postProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + +#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H diff --git a/bundles/remote_services/examples/interceptors/include/second_interceptor_private.h b/bundles/remote_services/examples/interceptors/include/second_interceptor_private.h new file mode 100644 index 0000000..dc6b97b --- /dev/null +++ b/bundles/remote_services/examples/interceptors/include/second_interceptor_private.h @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CELIX_SECOND_INTERCEPTOR_PRIVATE_H +#define CELIX_SECOND_INTERCEPTOR_PRIVATE_H + +#include <stdint.h> + +#include "remote_interceptor.h" + +typedef struct second_interceptor { +} second_interceptor_t; + +celix_status_t secondInterceptor_create(second_interceptor_t **interceptor); +celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor); + +bool secondInterceptor_preExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +void secondInterceptor_postExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +bool secondInterceptor_preProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +void secondInterceptor_postProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + +#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H diff --git a/bundles/remote_services/examples/interceptors/src/first_interceptor.c b/bundles/remote_services/examples/interceptors/src/first_interceptor.c new file mode 100644 index 0000000..384f7cd --- /dev/null +++ b/bundles/remote_services/examples/interceptors/src/first_interceptor.c @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "first_interceptor_private.h" + +#include <stdlib.h> +#include <inttypes.h> + +celix_status_t firstInterceptor_create(first_interceptor_t **interceptor) { + celix_status_t status = CELIX_SUCCESS; + + *interceptor = calloc(1, sizeof(**interceptor)); + if (!*interceptor) { + status = CELIX_ENOMEM; + } else { + (*interceptor)->sequenceNumber = 0; + } + + return status; +} + +celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) { + free(interceptor); + return CELIX_SUCCESS; +} + + +bool firstInterceptor_preExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked preExportCall on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); + + return true; +} + +void firstInterceptor_postExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked postExportCall on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); +} + +bool firstInterceptor_preProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + first_interceptor_t *interceptor = handle; + celix_properties_setLong((celix_properties_t *) metadata, SEQUENCE_NUMBER, interceptor->sequenceNumber++); + + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked preProxyCall on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); + + return true; +} + +void firstInterceptor_postProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked postProxyCall on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); +} + diff --git a/bundles/remote_services/examples/interceptors/src/rs_interceptor_activator.c b/bundles/remote_services/examples/interceptors/src/rs_interceptor_activator.c new file mode 100644 index 0000000..d0f3991 --- /dev/null +++ b/bundles/remote_services/examples/interceptors/src/rs_interceptor_activator.c @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "celix_api.h" + +#include "first_interceptor_private.h" +#include "second_interceptor_private.h" + +#include <string.h> + +struct interceptorActivator { + first_interceptor_t *interceptor; + uint64_t interceptorSvcId; + + second_interceptor_t *secondInterceptor; + uint64_t secondInterceptorSvcId; +}; + +static int interceptor_start(struct interceptorActivator *act, celix_bundle_context_t *ctx) { + remote_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc)); + first_interceptor_t *interceptor = NULL; + firstInterceptor_create(&interceptor); + + interceptorSvc->handle = interceptor; + interceptorSvc->preProxyCall = firstInterceptor_preProxyCall; + interceptorSvc->postProxyCall = firstInterceptor_postProxyCall; + interceptorSvc->preExportCall = firstInterceptor_preExportCall; + interceptorSvc->postExportCall = firstInterceptor_postExportCall; + + act->interceptor = interceptor; + + celix_properties_t *props = celix_properties_create(); + celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10); + + celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + opts.svc = interceptorSvc; + opts.serviceName = REMOTE_INTERCEPTOR_SERVICE_NAME; + opts.serviceVersion = REMOTE_INTERCEPTOR_SERVICE_VERSION; + opts.properties = props; + + act->interceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); + + remote_interceptor_t *secondInterceptorSvc = calloc(1, sizeof(*secondInterceptorSvc)); + second_interceptor_t *secondInterceptor = NULL; + secondInterceptor_create(&secondInterceptor); + + secondInterceptorSvc->handle = secondInterceptor; + secondInterceptorSvc->preProxyCall = secondInterceptor_preProxyCall; + secondInterceptorSvc->postProxyCall = secondInterceptor_postProxyCall; + secondInterceptorSvc->preExportCall = secondInterceptor_preExportCall; + secondInterceptorSvc->postExportCall = secondInterceptor_postExportCall; + + act->secondInterceptor = secondInterceptor; + + celix_properties_t *secondProps = celix_properties_create(); + celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20); + + celix_service_registration_options_t secondOpts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + secondOpts.svc = secondInterceptorSvc; + secondOpts.serviceName = REMOTE_INTERCEPTOR_SERVICE_NAME; + secondOpts.serviceVersion = REMOTE_INTERCEPTOR_SERVICE_VERSION; + secondOpts.properties = secondProps; + + act->secondInterceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &secondOpts); + + return 0; +} + +static int interceptor_stop(struct interceptorActivator *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->interceptorSvcId); + firstInterceptor_destroy(act->interceptor); + + celix_bundleContext_unregisterService(ctx, act->secondInterceptorSvcId); + secondInterceptor_destroy(act->secondInterceptor); + + return 0; +} + +CELIX_GEN_BUNDLE_ACTIVATOR(struct interceptorActivator, interceptor_start, interceptor_stop) \ No newline at end of file diff --git a/bundles/remote_services/examples/interceptors/src/second_interceptor.c b/bundles/remote_services/examples/interceptors/src/second_interceptor.c new file mode 100644 index 0000000..2aea101 --- /dev/null +++ b/bundles/remote_services/examples/interceptors/src/second_interceptor.c @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "second_interceptor_private.h" + +#include <stdlib.h> + +celix_status_t secondInterceptor_create(second_interceptor_t **interceptor) { + celix_status_t status = CELIX_SUCCESS; + + *interceptor = calloc(1, sizeof(**interceptor)); + if (!*interceptor) { + status = CELIX_ENOMEM; + } else { + } + + return status; +} + +celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) { + free(interceptor); + return CELIX_SUCCESS; +} + + +bool secondInterceptor_preExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + printf("Invoked preExportCall on second interceptor\n"); + + return true; +} + +void secondInterceptor_postExportCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + printf("Invoked postExportCall on second interceptor\n"); +} + +bool secondInterceptor_preProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + printf("Invoked preProxyCall on second interceptor\n"); + + return true; +} + +void secondInterceptor_postProxyCall(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + printf("Invoked postProxyCall on second interceptor\n"); +} + diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c index b1cc1f2..36b6f3f 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c +++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c @@ -28,6 +28,7 @@ #include "celix_constants.h" #include "export_registration_dfi.h" #include "dfi_utils.h" +#include "remote_interceptors_handler.h" struct export_reference { endpoint_description_t *endpoint; //owner @@ -47,6 +48,8 @@ struct export_registration { //TODO add tracker and lock bool closed; + remote_interceptors_handler_t *interceptorsHandler; + FILE *logFile; }; @@ -80,6 +83,8 @@ celix_status_t exportRegistration_create(log_helper_t *helper, service_reference reg->logFile = logFile; reg->servId = strndup(servId, 1024); + remoteInterceptorsHandler_create(context, ®->interceptorsHandler); + celixThreadMutex_create(®->mutex, NULL); } @@ -129,22 +134,36 @@ celix_status_t exportRegistration_create(log_helper_t *helper, service_reference return status; } -celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength, char **responseOut, int *responseLength) { +celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength, celix_properties_t *metadata, char **responseOut, int *responseLength) { int status = CELIX_SUCCESS; *responseLength = -1; - celixThreadMutex_lock(&export->mutex); - status = jsonRpc_call(export->intf, export->service, data, responseOut); - celixThreadMutex_unlock(&export->mutex); - - //printf("calling for '%s'\n"); - if (export->logFile != NULL) { - static int callCount = 0; - char *name = NULL; - dynInterface_getName(export->intf, &name); - fprintf(export->logFile, "REMOTE CALL %i\n\tservice=%s\n\tservice_id=%s\n\trequest_payload=%s\n\tstatus=%i\n", callCount, name, export->servId, data, status); - fflush(export->logFile); - callCount += 1; + json_error_t error; + json_t *js_request = json_loads(data, 0, &error); + const char *sig; + if (js_request) { + if (json_unpack(js_request, "{s:s}", "m", &sig) == 0) { + bool cont = remoteInterceptorHandler_invokePreExportCall(export->interceptorsHandler, "TODO", export->exportReference.endpoint->properties, sig, &metadata); + if (cont) { + celixThreadMutex_lock(&export->mutex); + status = jsonRpc_call(export->intf, export->service, data, responseOut); + celixThreadMutex_unlock(&export->mutex); + + remoteInterceptorHandler_invokePostExportCall(export->interceptorsHandler, "TODO", export->exportReference.endpoint->properties, sig, metadata); + } + + //printf("calling for '%s'\n"); + if (export->logFile != NULL) { + static int callCount = 0; + char *name = NULL; + dynInterface_getName(export->intf, &name); + fprintf(export->logFile, "REMOTE CALL %i\n\tservice=%s\n\tservice_id=%s\n\trequest_payload=%s\n\tstatus=%i\n", callCount, name, export->servId, data, status); + fflush(export->logFile); + callCount += 1; + } + } + } else { + status = CELIX_ILLEGAL_ARGUMENT; } return status; @@ -197,6 +216,9 @@ void exportRegistration_destroy(export_registration_t *reg) { if (reg->servId != NULL) { free(reg->servId); } + + remoteInterceptorsHandler_destroy(reg->interceptorsHandler); + celixThreadMutex_destroy(®->mutex); free(reg); diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h index d1fe9a3..07ede3d 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h +++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h @@ -32,7 +32,7 @@ void exportRegistration_destroy(export_registration_t *registration); celix_status_t exportRegistration_start(export_registration_t *registration); celix_status_t exportRegistration_stop(export_registration_t *registration); -celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength, char **response, int *responseLength); +celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength, celix_properties_t *metadata, char **response, int *responseLength); #endif //CELIX_EXPORT_REGISTRATION_DFI_H diff --git a/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.c b/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.c index 55a3810..e639459 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.c +++ b/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.c @@ -27,6 +27,7 @@ #include "import_registration.h" #include "import_registration_dfi.h" #include "remote_service_admin_dfi.h" +#include "remote_interceptors_handler.h" #include "remote_service_admin_dfi_constants.h" struct import_registration { @@ -45,6 +46,8 @@ struct import_registration { hash_map_pt proxies; //key -> bundle, value -> service_proxy celix_thread_mutex_t proxiesMutex; //protects proxies + remote_interceptors_handler_t *interceptorsHandler; + FILE *logFile; }; @@ -78,6 +81,8 @@ celix_status_t importRegistration_create(celix_bundle_context_t *context, endpoi reg->classObject = classObject; reg->proxies = hashMap_create(NULL, NULL, NULL, NULL); + remoteInterceptorsHandler_create(context, ®->interceptorsHandler); + celixThreadMutex_create(®->mutex, NULL); celixThreadMutex_create(®->proxiesMutex, NULL); status = version_createVersionFromString((char*)serviceVersion,&(reg->version)); @@ -136,6 +141,8 @@ void importRegistration_destroy(import_registration_t *import) { import->proxies = NULL; } + remoteInterceptorsHandler_destroy(import->interceptorsHandler); + pthread_mutex_destroy(&import->mutex); pthread_mutex_destroy(&import->proxiesMutex); @@ -333,19 +340,25 @@ static void importRegistration_proxyFunc(void *userData, void *args[], void *ret char *reply = NULL; int rc = 0; //printf("sending request\n"); - celixThreadMutex_lock(&import->mutex); - if (import->send != NULL) { - import->send(import->sendHandle, import->endpoint, invokeRequest, &reply, &rc); - } - celixThreadMutex_unlock(&import->mutex); - //printf("request sended. got reply '%s' with status %i\n", reply, rc); + celix_properties_t *metadata = NULL; + bool cont = remoteInterceptorHandler_invokePreProxyCall(import->interceptorsHandler, "TODO", import->endpoint->properties, entry->name, &metadata); + if (cont) { + celixThreadMutex_lock(&import->mutex); + if (import->send != NULL) { + import->send(import->sendHandle, import->endpoint, invokeRequest, metadata, &reply, &rc); + } + celixThreadMutex_unlock(&import->mutex); + //printf("request sended. got reply '%s' with status %i\n", reply, rc); - if (rc == 0 && dynFunction_hasReturn(entry->dynFunc)) { - //fjprintf("Handling reply '%s'\n", reply); - status = jsonRpc_handleReply(entry->dynFunc, reply, args); - } + if (rc == 0 && dynFunction_hasReturn(entry->dynFunc)) { + //fjprintf("Handling reply '%s'\n", reply); + status = jsonRpc_handleReply(entry->dynFunc, reply, args); + } - *(int *) returnVal = rc; + *(int *) returnVal = rc; + + remoteInterceptorHandler_invokePostProxyCall(import->interceptorsHandler, "TODO", import->endpoint->properties, entry->name, metadata); + } if (import->logFile != NULL) { static int callCount = 0; @@ -440,3 +453,4 @@ static const char* importRegistration_getUrl(import_registration_t *reg) { static const char* importRegistration_getServiceName(import_registration_t *reg) { return reg->endpoint->service; } + diff --git a/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.h b/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.h index 1d68a7c..c99fcee 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.h +++ b/bundles/remote_services/remote_service_admin_dfi/src/import_registration_dfi.h @@ -25,7 +25,7 @@ #include <celix_errno.h> -typedef void (*send_func_type)(void *handle, endpoint_description_t *endpointDescription, char *request, char **reply, int* replyStatus); +typedef void (*send_func_type)(void *handle, endpoint_description_t *endpointDescription, char *request, celix_properties_t *metadata, char **reply, int* replyStatus); celix_status_t importRegistration_create(celix_bundle_context_t *context, endpoint_description_t *description, const char *classObject, const char* serviceVersion, FILE *logFile, import_registration_t **import); diff --git a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c index fa69e0f..4c95c07 100644 --- a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c +++ b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c @@ -105,7 +105,7 @@ static const unsigned int DEFAULT_TIMEOUT = 0; static int remoteServiceAdmin_callback(struct mg_connection *conn); static celix_status_t remoteServiceAdmin_createEndpointDescription(remote_service_admin_t *admin, service_reference_pt reference, celix_properties_t *props, char *interface, endpoint_description_t **description); -static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description_t *endpointDescription, char *request, char **reply, int* replyStatus); +static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description_t *endpointDescription, char *request, celix_properties_t *metadata, char **reply, int* replyStatus); static celix_status_t remoteServiceAdmin_getIpAddress(char* interface, char** ip); static size_t remoteServiceAdmin_readCallback(void *ptr, size_t size, size_t nmemb, void *userp); static size_t remoteServiceAdmin_write(void *contents, size_t size, size_t nmemb, void *userp); @@ -365,6 +365,18 @@ static int remoteServiceAdmin_callback(struct mg_connection *conn) { service[pos] = '\0'; unsigned long serviceId = strtoul(service,NULL,10); + celix_properties_t *metadata = NULL; + + for (int i = 0; i < request_info->num_headers; i++) { + struct mg_header header = request_info->http_headers[i]; + if (strncmp(header.name, "X-RSA-Metadata-", 15) == 0) { + if (metadata == NULL) { + metadata = celix_properties_create(); + } + celix_properties_set(metadata, header.name + 15, header.value); + } + } + celixThreadRwlock_readLock(&rsa->exportedServicesLock); //find endpoint @@ -399,7 +411,7 @@ static int remoteServiceAdmin_callback(struct mg_connection *conn) { char *response = NULL; int responceLength = 0; - int rc = exportRegistration_call(export, data, -1, &response, &responceLength); + int rc = exportRegistration_call(export, data, -1, metadata, &response, &responceLength); if (rc != CELIX_SUCCESS) { RSA_LOG_ERROR(rsa, "Error trying to invoke remove service, got error %i\n", rc); } @@ -774,7 +786,7 @@ celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_t * return status; } -static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description_t *endpointDescription, char *request, char **reply, int* replyStatus) { +static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description_t *endpointDescription, char *request, celix_properties_t *metadata, char **reply, int* replyStatus) { remote_service_admin_t * rsa = handle; struct post post; post.readptr = request; @@ -813,6 +825,22 @@ static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description if(!curl) { status = CELIX_ILLEGAL_STATE; } else { + struct curl_slist *metadataHeader = NULL; + if (metadata != NULL && celix_properties_size(metadata) > 0) { + const char *key = NULL; + CELIX_PROPERTIES_FOR_EACH(metadata, key) { + const char *val = celix_properties_get(metadata, key, ""); + size_t length = strlen(key) + strlen(val) + 18; // "X-RSA-Metadata-key: val\0" + + char header[length]; + + snprintf(header, length, "X-RSA-Metadata-%s: %s", key, val); + metadataHeader = curl_slist_append(metadataHeader, header); + } + + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, metadataHeader); + } + curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout); curl_easy_setopt(curl, CURLOPT_URL, url); @@ -830,6 +858,7 @@ static celix_status_t remoteServiceAdmin_send(void *handle, endpoint_description *replyStatus = res; curl_easy_cleanup(curl); + curl_slist_free_all(metadataHeader); } return status; diff --git a/bundles/remote_services/rsa_common/CMakeLists.txt b/bundles/remote_services/rsa_common/CMakeLists.txt index 397fa8c..0f48215 100644 --- a/bundles/remote_services/rsa_common/CMakeLists.txt +++ b/bundles/remote_services/rsa_common/CMakeLists.txt @@ -19,7 +19,7 @@ add_library(rsa_common STATIC src/endpoint_description.c src/export_registration_impl.c src/import_registration_impl.c -) + src/remote_interceptors_handler.c) set_target_properties(rsa_common PROPERTIES OUTPUT_NAME "celix_rsa_common") target_include_directories(rsa_common PRIVATE src) target_link_libraries(rsa_common PUBLIC Celix::framework Celix::rsa_spi Celix::log_helper) diff --git a/bundles/remote_services/rsa_common/src/remote_interceptors_handler.c b/bundles/remote_services/rsa_common/src/remote_interceptors_handler.c new file mode 100644 index 0000000..8fbeda9 --- /dev/null +++ b/bundles/remote_services/rsa_common/src/remote_interceptors_handler.c @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <stdlib.h> + +#include "celix_bundle_context.h" +#include "celix_constants.h" +#include "utils.h" + +#include "remote_interceptors_handler.h" + +typedef struct entry { + const celix_properties_t *properties; + remote_interceptor_t *interceptor; +} entry_t; + +struct remote_interceptors_handler { + celix_array_list_t *interceptors; + + long interceptorsTrackerId; + + celix_bundle_context_t *ctx; +}; + +static int referenceCompare(const void *a, const void *b); + +static void remoteInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props); +static void remoteInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props); + +celix_status_t remoteInterceptorsHandler_create(celix_bundle_context_t *ctx, remote_interceptors_handler_t **handler) { + celix_status_t status = CELIX_SUCCESS; + + *handler = calloc(1, sizeof(**handler)); + if (!*handler) { + status = CELIX_ENOMEM; + } else { + (*handler)->ctx = ctx; + + (*handler)->interceptors = celix_arrayList_create(); + + // Create service tracker here, and not in the activator + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = REMOTE_INTERCEPTOR_SERVICE_NAME; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = *handler; + opts.addWithProperties = remoteInterceptorsHandler_addInterceptor; + opts.removeWithProperties = remoteInterceptorsHandler_removeInterceptor; + (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + return status; +} + +celix_status_t remoteInterceptorsHandler_destroy(remote_interceptors_handler_t *handler) { + celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId); + + celix_arrayList_destroy(handler->interceptors); + free(handler); + + return CELIX_SUCCESS; +} + +void remoteInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) { + remote_interceptors_handler_t *handler = handle; + + bool exists = false; + for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + if (entry->interceptor == svc) { + exists = true; + } + } + if (!exists) { + entry_t *entry = calloc(1, sizeof(*entry)); + entry->properties = props; + entry->interceptor = svc; + celix_arrayList_add(handler->interceptors, entry); + + celix_arrayList_sort(handler->interceptors, referenceCompare); + } +} + +void remoteInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attribute__((unused)) const celix_properties_t *props) { + remote_interceptors_handler_t *handler = handle; + for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + if (entry->interceptor == svc) { + arrayList_remove(handler->interceptors, i); + break; + } + } +} + +bool remoteInterceptorHandler_invokePreExportCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t **metadata) { + bool cont = true; + + if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) { + *metadata = celix_properties_create(); + } + + for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { + entry_t *entry = arrayList_get(handler->interceptors, i - 1); + + cont = entry->interceptor->preExportCall(entry->interceptor->handle, rsaType, svcProperties, functionName, *metadata); + if (!cont) { + break; + } + } + + return cont; +} + +void remoteInterceptorHandler_invokePostExportCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { + entry_t *entry = arrayList_get(handler->interceptors, i - 1); + + entry->interceptor->postExportCall(entry->interceptor->handle, rsaType, svcProperties, functionName, metadata); + } +} + +bool remoteInterceptorHandler_invokePreProxyCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t **metadata) { + bool cont = true; + + if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) { + *metadata = celix_properties_create(); + } + + for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + cont = entry->interceptor->preProxyCall(entry->interceptor->handle, rsaType, svcProperties, functionName, *metadata); + if (!cont) { + break; + } + } + + return cont; +} + +void remoteInterceptorHandler_invokePostProxyCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata) { + for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + entry->interceptor->postProxyCall(entry->interceptor->handle, rsaType, svcProperties, functionName, metadata); + } +} + +int referenceCompare(const void *a, const void *b) { + const entry_t *aEntry = a; + const entry_t *bEntry = b; + + long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0); + long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0); + + long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0); + long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0); + + return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB); +} \ No newline at end of file diff --git a/bundles/remote_services/rsa_spi/include/remote_interceptor.h b/bundles/remote_services/rsa_spi/include/remote_interceptor.h new file mode 100644 index 0000000..695b73a --- /dev/null +++ b/bundles/remote_services/rsa_spi/include/remote_interceptor.h @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CELIX_REMOTE_INTERCEPTOR_H +#define CELIX_REMOTE_INTERCEPTOR_H + +#include <celix_properties.h> + +#define REMOTE_INTERCEPTOR_SERVICE_NAME "remote.interceptor" +#define REMOTE_INTERCEPTOR_SERVICE_VERSION "1.0.0" + +typedef struct remote_interceptor { + void *handle; + + bool (*preExportCall)(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + void (*postExportCall)(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + bool (*preProxyCall)(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + void (*postProxyCall)(void *handle, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +} remote_interceptor_t; + +#endif //CELIX_REMOTE_INTERCEPTOR_H diff --git a/bundles/remote_services/rsa_spi/include/remote_interceptors_handler.h b/bundles/remote_services/rsa_spi/include/remote_interceptors_handler.h new file mode 100644 index 0000000..01be2b8 --- /dev/null +++ b/bundles/remote_services/rsa_spi/include/remote_interceptors_handler.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CELIX_REMOTE_INTERCEPTORS_HANDLER_H +#define CELIX_REMOTE_INTERCEPTORS_HANDLER_H + +#include <stdint.h> +#include <celix_types.h> + +#include "celix_errno.h" +#include "celix_array_list.h" +#include "remote_interceptor.h" +#include "celix_properties.h" + +typedef struct remote_interceptors_handler remote_interceptors_handler_t; + +celix_status_t remoteInterceptorsHandler_create(celix_bundle_context_t *ctx, remote_interceptors_handler_t **handler); +celix_status_t remoteInterceptorsHandler_destroy(remote_interceptors_handler_t *handler); + +bool remoteInterceptorHandler_invokePreExportCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t **metadata); +void remoteInterceptorHandler_invokePostExportCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); +bool remoteInterceptorHandler_invokePreProxyCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t **metadata); +void remoteInterceptorHandler_invokePostProxyCall(remote_interceptors_handler_t *handler, const char *rsaType, const celix_properties_t *svcProperties, const char *functionName, celix_properties_t *metadata); + +#endif //CELIX_REMOTE_INTERCEPTORS_HANDLER_H
