This is an automated email from the ASF dual-hosted git repository. abroekhuis pushed a commit to branch feature/pubsub_inteceptors in repository https://gitbox.apache.org/repos/asf/celix.git
commit 6025682e24786bdceffe14a4b6e85fc6e3fdc303 Author: Alexander Broekhuis <[email protected]> AuthorDate: Wed Mar 25 08:11:26 2020 +0100 Added interceptor service and handling for PubSub ZMQ. Updated arraylist to have a sort function. --- bundles/pubsub/examples/CMakeLists.txt | 2 + bundles/pubsub/examples/pubsub/CMakeLists.txt | 1 + .../pubsub/{ => interceptors}/CMakeLists.txt | 19 +-- .../include/first_interceptor_private.h | 43 +++++ .../include/second_interceptor_private.h | 36 +++++ .../pubsub/interceptors/src/first_interceptor.c | 71 +++++++++ .../interceptors/src/ps_interceptor_activator.c | 94 +++++++++++ .../pubsub/interceptors/src/second_interceptor.c | 58 +++++++ .../src/pubsub_zmq_topic_receiver.c | 28 +++- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 174 +++++++++++---------- bundles/pubsub/pubsub_spi/CMakeLists.txt | 2 +- .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 46 ++++++ .../include/pubsub_interceptors_handler.h | 37 +++++ .../pubsub_spi/src/pubsub_interceptors_handler.c | 174 +++++++++++++++++++++ libs/utils/include/celix_array_list.h | 4 + libs/utils/src/array_list.c | 21 +++ 16 files changed, 716 insertions(+), 94 deletions(-) diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt index be55811..ccb674c 100644 --- a/bundles/pubsub/examples/CMakeLists.txt +++ b/bundles/pubsub/examples/CMakeLists.txt @@ -239,6 +239,7 @@ if (BUILD_PUBSUB_PSA_ZMQ) Celix::pubsub_admin_zmq celix_pubsub_poi_publisher celix_pubsub_poi_publisher2 + celix_pubsub_interceptors_example PROPERTIES PSA_ZMQ_VERBOSE=true PUBSUB_ETCD_DISCOVERY_VERBOSE=true @@ -258,6 +259,7 @@ if (BUILD_PUBSUB_PSA_ZMQ) Celix::pubsub_topology_manager Celix::pubsub_admin_zmq celix_pubsub_poi_subscriber + celix_pubsub_interceptors_example PROPERTIES PSA_ZMQ_VERBOSE=true PUBSUB_ETCD_DISCOVERY_VERBOSE=true diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/CMakeLists.txt index 8b5c653..427dbd1 100644 --- a/bundles/pubsub/examples/pubsub/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt @@ -17,6 +17,7 @@ include_directories("common/include") +add_subdirectory(interceptors) add_subdirectory(publisher) add_subdirectory(publisher2) if (BUILD_PUBSUB_PSA_WS) diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt similarity index 65% copy from bundles/pubsub/examples/pubsub/CMakeLists.txt copy to bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt index 8b5c653..1bf920b 100644 --- a/bundles/pubsub/examples/pubsub/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/interceptors/CMakeLists.txt @@ -15,13 +15,14 @@ # specific language governing permissions and limitations # under the License. -include_directories("common/include") - -add_subdirectory(publisher) -add_subdirectory(publisher2) -if (BUILD_PUBSUB_PSA_WS) - add_subdirectory(pubsub_websocket) -endif() -add_subdirectory(subscriber) - +add_celix_bundle(celix_pubsub_interceptors_example + SYMBOLIC_NAME "celix_pubsub_interceptors_example" + VERSION "1.0.0" + SOURCES + src/ps_interceptor_activator.c + src/first_interceptor.c + src/second_interceptor.c +) +target_link_libraries(celix_pubsub_interceptors_example PRIVATE Celix::framework Celix::pubsub_spi) +target_include_directories(celix_pubsub_interceptors_example PRIVATE include) \ No newline at end of file diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h new file mode 100644 index 0000000..c7dd87a --- /dev/null +++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CELIX_FIRST_INTERCEPTOR_PRIVATE_H +#define CELIX_FIRST_INTERCEPTOR_PRIVATE_H + +#include <celix_threads.h> +#include "pubsub_interceptor.h" + +typedef struct first_interceptor { + + celix_thread_mutex_t mutex; + + uint64_t sequenceNumber; + +} first_interceptor_t; + +static const char *const SEQUENCE_NUMBER = "sequence.number"; + +celix_status_t firstInterceptor_create(first_interceptor_t **interceptor); +celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor); + +bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); + +#endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h new file mode 100644 index 0000000..979b2c7 --- /dev/null +++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CELIX_SECOND_INTERCEPTOR_PRIVATE_H +#define CELIX_SECOND_INTERCEPTOR_PRIVATE_H + +#include "pubsub_interceptor.h" + +typedef struct second_interceptor { + +} second_interceptor_t; + +celix_status_t secondInterceptor_create(second_interceptor_t **interceptor); +celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor); + +bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); + +#endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c new file mode 100644 index 0000000..64c63fe --- /dev/null +++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "first_interceptor_private.h" + +celix_status_t firstInterceptor_create(first_interceptor_t **interceptor) { + celix_status_t status = CELIX_SUCCESS; + + *interceptor = calloc(1, sizeof(**interceptor)); + if (!*interceptor) { + status = CELIX_ENOMEM; + } else { + (*interceptor)->sequenceNumber = 0; + + status = celixThreadMutex_create(&(*interceptor)->mutex, NULL); + } + + return status; +} + +celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) { + free(interceptor); + return CELIX_SUCCESS; +} + + +bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + first_interceptor_t *interceptor = handle; + celixThreadMutex_lock(&interceptor->mutex); + + printf("Invoked preSend on first interceptor\n"); + + celix_properties_setLong(metadata, SEQUENCE_NUMBER, interceptor->sequenceNumber++); + + celixThreadMutex_unlock(&interceptor->mutex); + + return true; +} + +void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked postSend on first interceptor, for message with sequenceNumber [%llu]\n", sequence); +} + +bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence); + + return true; +} + +void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); + printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%llu]\n", sequence); +} + diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c new file mode 100644 index 0000000..12a055c --- /dev/null +++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "celix_api.h" + +#include "first_interceptor_private.h" +#include "second_interceptor_private.h" + +#include <string.h> + +struct interceptorActivator { + first_interceptor_t *interceptor; + uint64_t interceptorSvcId; + + second_interceptor_t *secondInterceptor; + uint64_t secondInterceptorSvcId; +}; + +static int interceptor_start(struct interceptorActivator *act, celix_bundle_context_t *ctx) { + pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc)); + first_interceptor_t *interceptor = NULL; + firstInterceptor_create(&interceptor); + + interceptorSvc->handle = interceptor; + interceptorSvc->preSend = firstInterceptor_preSend; + interceptorSvc->postSend = firstInterceptor_postSend; + interceptorSvc->preReceive = firstInterceptor_preReceive; + interceptorSvc->postReceive = firstInterceptor_postReceive; + + act->interceptor = interceptor; + + celix_properties_t *props = celix_properties_create(); + celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10); + + celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + opts.svc = interceptorSvc; + opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME; + opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION; + opts.properties = props; + + act->interceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); + + pubsub_interceptor_t *secondInterceptorSvc = calloc(1, sizeof(*secondInterceptorSvc)); + second_interceptor_t *secondInterceptor = NULL; + secondInterceptor_create(&secondInterceptor); + + secondInterceptorSvc->handle = secondInterceptor; + secondInterceptorSvc->preSend = secondInterceptor_preSend; + secondInterceptorSvc->postSend = secondInterceptor_postSend; + secondInterceptorSvc->preReceive = secondInterceptor_preReceive; + secondInterceptorSvc->postReceive = secondInterceptor_postReceive; + + act->secondInterceptor = secondInterceptor; + + celix_properties_t *secondProps = celix_properties_create(); + celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20); + + celix_service_registration_options_t secondOpts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + secondOpts.svc = secondInterceptorSvc; + secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME; + secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION; + secondOpts.properties = secondProps; + + act->secondInterceptorSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &secondOpts); + + return 0; +} + +static int interceptor_stop(struct interceptorActivator *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->interceptorSvcId); + firstInterceptor_destroy(act->interceptor); + + celix_bundleContext_unregisterService(ctx, act->secondInterceptorSvcId); + secondInterceptor_destroy(act->secondInterceptor); + + return 0; +} + +CELIX_GEN_BUNDLE_ACTIVATOR(struct interceptorActivator, interceptor_start, interceptor_stop) \ No newline at end of file diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c new file mode 100644 index 0000000..33f0dd3 --- /dev/null +++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "second_interceptor_private.h" + +celix_status_t secondInterceptor_create(second_interceptor_t **interceptor) { + celix_status_t status = CELIX_SUCCESS; + + *interceptor = calloc(1, sizeof(**interceptor)); + if (!*interceptor) { + status = CELIX_ENOMEM; + } else { + } + + return status; +} + +celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) { + free(interceptor); + return CELIX_SUCCESS; +} + + +bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + printf("Invoked preSend on second interceptor\n"); + + return true; +} + +void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + printf("Invoked postSend on second interceptor\n"); +} + +bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + printf("Invoked preReceive on second interceptor\n"); + + return true; +} + +void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { + printf("Invoked postReceive on second interceptor\n"); +} + diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index 979d373..cbc2cf9 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -37,6 +37,8 @@ #include <uuid/uuid.h> #include <pubsub_admin_metrics.h> +#include "pubsub_interceptors_handler.h" + #include "celix_utils_api.h" #define PSA_ZMQ_RECV_TIMEOUT 1000 @@ -66,6 +68,8 @@ struct pubsub_zmq_topic_receiver { char *topic; bool metricsEnabled; + pubsub_interceptors_handler_t *interceptorsHandler; + void *zmqCtx; void *zmqSock; @@ -150,6 +154,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context receiver->topic = strndup(topic, 1024 * 1024); receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); + pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); #ifdef BUILD_WITH_ZMQ_SECURITY char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); @@ -320,6 +325,8 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) { zmq_close(receiver->zmqSock); zmq_ctx_term(receiver->zmqCtx); + pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); + free(receiver->scope); free(receiver->topic); } @@ -492,12 +499,23 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec clock_gettime(CLOCK_REALTIME, &endSer); } if (status == CELIX_SUCCESS) { - bool release = true; - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release); - if (release) { - msgSer->freeMsg(msgSer->handle, deserializedMsg); + + const char *msgType = msgSer->msgName; + uint32_t msgId = message->header.msgId; + celix_properties_t *metadata = message->metadata.metadata; + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata); + if (cont) { + bool release = true; + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, + metadata, &release); + if (release) { + msgSer->freeMsg(msgSer->handle, deserializedMsg); + } + + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata); + + updateReceiveCount += 1; } - updateReceiveCount += 1; } else { updateSerError += 1; L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic); diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 50a7879..646c6d9 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -32,6 +32,7 @@ #include "pubsub_psa_zmq_constants.h" #include <uuid/uuid.h> #include "celix_constants.h" +#include "pubsub_interceptors_handler.h" #define FIRST_SEND_DELAY_IN_SECONDS 2 #define ZMQ_BIND_MAX_RETRY 10 @@ -56,6 +57,8 @@ struct pubsub_zmq_topic_sender { bool metricsEnabled; bool zeroCopyEnabled; + pubsub_interceptors_handler_t *interceptorsHandler; + char *scope; char *topic; char *url; @@ -142,6 +145,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED); + pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); + //setting up zmq socket for ZMQ TopicSender { #ifdef BUILD_WITH_ZMQ_SECURITY @@ -318,6 +323,8 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) { celixThreadMutex_destroy(&sender->boundedServices.mutex); celixThreadMutex_destroy(&sender->zmq.mutex); + pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); + free(sender->scope); free(sender->topic); free(sender->url); @@ -528,102 +535,111 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co } if (status == CELIX_SUCCESS /*ser ok*/) { - pubsub_protocol_message_t message; - message.payload.payload = serializedOutput; - message.payload.length = serializedOutputLen; - - void *payloadData = NULL; - size_t payloadLength = 0; - entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength); - - void *metadataData = NULL; - size_t metadataLength = 0; - if (metadata != NULL) { - message.metadata.metadata = metadata; - entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength); + if (metadata == NULL) { + metadata = celix_properties_create(); } + celixThreadMutex_lock(&entry->sendLock); - message.header.msgId = msgTypeId; - message.header.msgMajorVersion = 0; - message.header.msgMinorVersion = 0; - message.header.payloadSize = payloadLength; - message.header.metadataSize = metadataLength; + bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); + if (cont) { + pubsub_protocol_message_t message; + message.payload.payload = serializedOutput; + message.payload.length = serializedOutputLen; + + void *payloadData = NULL; + size_t payloadLength = 0; + entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength); + + void *metadataData = NULL; + size_t metadataLength = 0; + if (metadata != NULL) { + message.metadata.metadata = metadata; + entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength); + } - void *headerData = NULL; - size_t headerLength = 0; + message.header.msgId = msgTypeId; + message.header.msgMajorVersion = 0; + message.header.msgMinorVersion = 0; + message.header.payloadSize = payloadLength; + message.header.metadataSize = metadataLength; - entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength); + void *headerData = NULL; + size_t headerLength = 0; - celixThreadMutex_lock(&entry->sendLock); + entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength); - errno = 0; - bool sendOk; - - if (bound->parent->zeroCopyEnabled) { - zmq_msg_t msg1; // Header - zmq_msg_t msg2; // Payload - zmq_msg_t msg3; // Metadata - void *socket = zsock_resolve(sender->zmq.socket); - - zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound); - //send header - int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE); - if (rc == -1) { - L_WARN("Error sending header msg. %s", strerror(errno)); - zmq_msg_close(&msg1); - } - //send header - if (rc > 0) { - zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound); - int flags = 0; - if (metadataLength > 0) { - flags = ZMQ_SNDMORE; - } - rc = zmq_msg_send(&msg2, socket, flags); + errno = 0; + bool sendOk; + + if (bound->parent->zeroCopyEnabled) { + zmq_msg_t msg1; // Header + zmq_msg_t msg2; // Payload + zmq_msg_t msg3; // Metadata + void *socket = zsock_resolve(sender->zmq.socket); + + zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound); + //send header + int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE); if (rc == -1) { - L_WARN("Error sending payload msg. %s", strerror(errno)); - zmq_msg_close(&msg2); + L_WARN("Error sending header msg. %s", strerror(errno)); + zmq_msg_close(&msg1); } - } - if (rc > 0 && metadataLength > 0) { - zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound); - rc = zmq_msg_send(&msg3, socket, 0); - if (rc == -1) { - L_WARN("Error sending metadata msg. %s", strerror(errno)); - zmq_msg_close(&msg3); + //send header + if (rc > 0) { + zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound); + int flags = 0; + if (metadataLength > 0) { + flags = ZMQ_SNDMORE; + } + rc = zmq_msg_send(&msg2, socket, flags); + if (rc == -1) { + L_WARN("Error sending payload msg. %s", strerror(errno)); + zmq_msg_close(&msg2); + } } - } - sendOk = rc > 0; - } else { - zmsg_t *msg = zmsg_new(); - zmsg_addmem(msg, headerData, headerLength); - zmsg_addmem(msg, payloadData, payloadLength); - if (metadataLength > 0) { - zmsg_addmem(msg, metadataData, metadataLength); - } - int rc = zmsg_send(&msg, sender->zmq.socket); - sendOk = rc == 0; + if (rc > 0 && metadataLength > 0) { + zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound); + rc = zmq_msg_send(&msg3, socket, 0); + if (rc == -1) { + L_WARN("Error sending metadata msg. %s", strerror(errno)); + zmq_msg_close(&msg3); + } + } + + sendOk = rc > 0; + } else { + zmsg_t *msg = zmsg_new(); + zmsg_addmem(msg, headerData, headerLength); + zmsg_addmem(msg, payloadData, payloadLength); + if (metadataLength > 0) { + zmsg_addmem(msg, metadataData, metadataLength); + } + int rc = zmsg_send(&msg, sender->zmq.socket); + sendOk = rc == 0; - if (!sendOk) { - zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg + if (!sendOk) { + zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg + } + + free(headerData); + free(payloadData); + free(metadataData); } - free(headerData); - free(payloadData); - free(metadataData); - } + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); - celix_properties_destroy(message.metadata.metadata); + celix_properties_destroy(metadata); - celixThreadMutex_unlock(&entry->sendLock); - if (sendOk) { - sendCountUpdate = 1; - } else { - sendErrorUpdate = 1; - L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno)); + celixThreadMutex_unlock(&entry->sendLock); + if (sendOk) { + sendCountUpdate = 1; + } else { + sendErrorUpdate = 1; + L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno)); + } } } else { serializationErrorUpdate = 1; diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt index a19131c..03097ec 100644 --- a/bundles/pubsub/pubsub_spi/CMakeLists.txt +++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt @@ -22,7 +22,7 @@ add_library(pubsub_spi STATIC src/pubsub_endpoint.c src/pubsub_utils.c src/pubsub_admin_metrics.c -) + src/pubsub_interceptors_handler.c) set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi") target_include_directories(pubsub_spi PUBLIC diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h new file mode 100644 index 0000000..57765ee --- /dev/null +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef __PUBSUB_INTERCEPTOR_H +#define __PUBSUB_INTERCEPTOR_H + +#include <stdlib.h> + +#include "celix_properties.h" + +#define PUBSUB_INTERCEPTOR_SERVICE_NAME "pubsub.interceptor" +#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "1.0.0" + +typedef struct pubsub_interceptor_properties { + const char *scope; + const char *topic; +} pubsub_interceptor_properties_t; + +struct pubsub_interceptor { + void *handle; + + bool (*preSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); + void (*postSend)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); + bool (*preReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); + void (*postReceive)(void *handle, pubsub_interceptor_properties_t properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +}; + +typedef struct pubsub_interceptor pubsub_interceptor_t; + +#endif //__PUBSUB_INTERCEPTOR_H diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h new file mode 100644 index 0000000..60461f8 --- /dev/null +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PUBSUB_INTERCEPTORS_HANDLER_H +#define PUBSUB_INTERCEPTORS_HANDLER_H + +#include "celix_errno.h" +#include "celix_array_list.h" +#include "pubsub_interceptor.h" +#include "celix_properties.h" + +typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t; + +celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler); +celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler); + +bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); +void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); +void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); + +#endif //PUBSUB_INTERCEPTORS_HANDLER_H diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c new file mode 100644 index 0000000..afd17e7 --- /dev/null +++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "celix_bundle_context.h" +#include "celix_constants.h" +#include "utils.h" + +#include "pubsub_interceptors_handler.h" + +typedef struct entry { + const celix_properties_t *properties; + pubsub_interceptor_t *interceptor; +} entry_t; + +struct pubsub_interceptors_handler { + pubsub_interceptor_properties_t properties; + + celix_array_list_t *interceptors; + + long interceptorsTrackerId; + + celix_bundle_context_t *ctx; + + celix_thread_mutex_t mutex; +}; + +static int referenceCompare(const void *a, const void *b); + +static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props); +static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props); + +celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) { + celix_status_t status = CELIX_SUCCESS; + + *handler = calloc(1, sizeof(**handler)); + if (!*handler) { + status = CELIX_ENOMEM; + } else { + (*handler)->ctx = ctx; + + (*handler)->properties.scope = scope; + (*handler)->properties.topic = topic; + + (*handler)->interceptors = celix_arrayList_create(); + + celixThreadMutex_create(&(*handler)->mutex, NULL); + + // Create service tracker here, and not in the activator + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = *handler; + opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor; + opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor; + (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + return status; +} + +celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) { + celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId); + + free(handler->interceptors); + free(handler); + + return CELIX_SUCCESS; +} + +void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) { + pubsub_interceptors_handler_t *handler = handle; + celixThreadMutex_lock(&handler->mutex); + + bool exists = false; + for (int i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + if (entry->interceptor == svc) { + exists = true; + } + } + if (!exists) { + entry_t *entry = calloc(1, sizeof(*entry)); + entry->properties = props; + entry->interceptor = svc; + celix_arrayList_add(handler->interceptors, entry); + + celix_arrayList_sort(handler->interceptors, referenceCompare); + } + + celixThreadMutex_unlock(&handler->mutex); +} + +void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attribute__((unused)) const celix_properties_t *props) { + pubsub_interceptors_handler_t *handler = handle; + for (int i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + if (entry->interceptor == svc) { + arrayList_remove(handler->interceptors, i); + break; + } + } +} + +bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { + bool cont = true; + for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + cont = entry->interceptor->preSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata); + if (!cont) { + break; + } + } + + return cont; +} + +void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { + for (int i = arrayList_size(handler->interceptors) - 1; i >= 0; i--) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + entry->interceptor->postSend(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata); + } +} + +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { + bool cont = true; + for (int i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + cont = entry->interceptor->preReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata); + if (!cont) { + break; + } + } + + return cont; +} + +void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { + for (int i = 0; i < arrayList_size(handler->interceptors); i++) { + entry_t *entry = arrayList_get(handler->interceptors, i); + + entry->interceptor->postReceive(entry->interceptor->handle, handler->properties, messageType, messageId, message, metadata); + } +} + +int referenceCompare(const void *a, const void *b) { + const entry_t *aEntry = a; + const entry_t *bEntry = b; + + long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0); + long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0); + + long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0); + long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0); + + return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB); +} \ No newline at end of file diff --git a/libs/utils/include/celix_array_list.h b/libs/utils/include/celix_array_list.h index aada9f4..ea4fc4a 100644 --- a/libs/utils/include/celix_array_list.h +++ b/libs/utils/include/celix_array_list.h @@ -45,6 +45,8 @@ typedef struct celix_array_list celix_array_list_t; typedef bool (*celix_arrayList_equals_fp)(celix_array_list_entry_t, celix_array_list_entry_t); +typedef int (*celix_arrayList_sort_fp)(const void *, const void *); + celix_array_list_t* celix_arrayList_create(); @@ -99,6 +101,8 @@ void celix_arrayList_removeDouble(celix_array_list_t *list, double val); void celix_arrayList_removeBool(celix_array_list_t *list, bool val); void celix_arrayList_removeSize(celix_array_list_t *list, size_t val); +void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp); + #ifdef __cplusplus } #endif diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c index 808d3b1..665b0af 100644 --- a/libs/utils/src/array_list.c +++ b/libs/utils/src/array_list.c @@ -564,3 +564,24 @@ void celix_arrayList_clear(celix_array_list_t *list) { } list->size = 0; } + +#if defined(__APPLE__) +static int celix_arrayList_compare(void *arg, const void * a, const void *b) { +#elif +static int celix_arrayList_compare(const void * a, const void *b, void *arg) { +#endif + const celix_array_list_entry_t *aEntry = a; + const celix_array_list_entry_t *bEntry = b; + + celix_arrayList_sort_fp sort = arg; + + return sort(aEntry->voidPtrVal, bEntry->voidPtrVal); +} + +void celix_arrayList_sort(celix_array_list_t *list, celix_arrayList_sort_fp sortFp) { +#if defined(__APPLE__) + qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), sortFp, celix_arrayList_compare); +#elif + qsort_r(list->elementData, list->size, sizeof(celix_array_list_entry_t), celix_arrayList_compare, sortFp); +#endif +}
