This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new b8e3c37 Bugfix/zmq fixes #191 (#199)
b8e3c37 is described below
commit b8e3c3763a0255b5d5e9c91fb85e282d2dad6269
Author: Michael de Lang <[email protected]>
AuthorDate: Thu Apr 16 14:11:37 2020 +0000
Bugfix/zmq fixes #191 (#199)
* Fix mem leaks
* Fix use after free in zmq zerocopy
* Fix issue with not unlocking mutex in some cases
* Proper fix for freeing activator resources
* PR feedback
* re-enable zmq zerocopy tests
---
.../interceptors/src/ps_interceptor_activator.c | 28 +++++------
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 57 ++++++++++++++++++----
.../pubsub_spi/src/pubsub_interceptors_handler.c | 3 +-
bundles/pubsub/test/CMakeLists.txt | 4 +-
bundles/pubsub/test/test/test_endpoint_runner.cc | 12 ++---
libs/framework/src/service_tracker.c | 22 +++++----
6 files changed, 84 insertions(+), 42 deletions(-)
diff --git
a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
index 12a055c..0b5785c 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
@@ -26,21 +26,22 @@
struct interceptorActivator {
first_interceptor_t *interceptor;
uint64_t interceptorSvcId;
+ pubsub_interceptor_t interceptorSvc;
second_interceptor_t *secondInterceptor;
uint64_t secondInterceptorSvcId;
+ pubsub_interceptor_t secondInterceptorSvc;
};
static int interceptor_start(struct interceptorActivator *act,
celix_bundle_context_t *ctx) {
- pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc));
first_interceptor_t *interceptor = NULL;
firstInterceptor_create(&interceptor);
- interceptorSvc->handle = interceptor;
- interceptorSvc->preSend = firstInterceptor_preSend;
- interceptorSvc->postSend = firstInterceptor_postSend;
- interceptorSvc->preReceive = firstInterceptor_preReceive;
- interceptorSvc->postReceive = firstInterceptor_postReceive;
+ act->interceptorSvc.handle = interceptor;
+ act->interceptorSvc.preSend = firstInterceptor_preSend;
+ act->interceptorSvc.postSend = firstInterceptor_postSend;
+ act->interceptorSvc.preReceive = firstInterceptor_preReceive;
+ act->interceptorSvc.postReceive = firstInterceptor_postReceive;
act->interceptor = interceptor;
@@ -48,22 +49,21 @@ static int interceptor_start(struct interceptorActivator
*act, celix_bundle_cont
celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10);
celix_service_registration_options_t opts =
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
- opts.svc = interceptorSvc;
+ opts.svc = &act->interceptorSvc;
opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
opts.properties = props;
act->interceptorSvcId =
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- pubsub_interceptor_t *secondInterceptorSvc = calloc(1,
sizeof(*secondInterceptorSvc));
second_interceptor_t *secondInterceptor = NULL;
secondInterceptor_create(&secondInterceptor);
- secondInterceptorSvc->handle = secondInterceptor;
- secondInterceptorSvc->preSend = secondInterceptor_preSend;
- secondInterceptorSvc->postSend = secondInterceptor_postSend;
- secondInterceptorSvc->preReceive = secondInterceptor_preReceive;
- secondInterceptorSvc->postReceive = secondInterceptor_postReceive;
+ act->secondInterceptorSvc.handle = secondInterceptor;
+ act->secondInterceptorSvc.preSend = secondInterceptor_preSend;
+ act->secondInterceptorSvc.postSend = secondInterceptor_postSend;
+ act->secondInterceptorSvc.preReceive = secondInterceptor_preReceive;
+ act->secondInterceptorSvc.postReceive = secondInterceptor_postReceive;
act->secondInterceptor = secondInterceptor;
@@ -71,7 +71,7 @@ static int interceptor_start(struct interceptorActivator
*act, celix_bundle_cont
celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20);
celix_service_registration_options_t secondOpts =
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
- secondOpts.svc = secondInterceptorSvc;
+ secondOpts.svc = &act->secondInterceptorSvc;
secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
secondOpts.properties = secondProps;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 3accf5d..2d05724 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -111,6 +111,13 @@ typedef struct psa_zmq_bounded_service_entry {
int getCount;
} psa_zmq_bounded_service_entry_t;
+typedef struct psa_zmq_zerocopy_free_entry {
+ pubsub_msg_serializer_t *msgSer;
+ struct iovec *serializedOutput;
+ size_t serializedOutputLen;
+} psa_zmq_zerocopy_free_entry;
+
+
static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -500,8 +507,15 @@ pubsub_admin_sender_metrics_t*
pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
return result;
}
-static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
- free(msg);
+static void psa_zmq_freeMsg(void *msg, void *hint) {
+ if(hint) {
+ psa_zmq_zerocopy_free_entry *entry = hint;
+ entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
entry->serializedOutput, entry->serializedOutputLen);
+ free(entry->serializedOutput);
+ free(entry);
+ } else {
+ free(msg);
+ }
}
static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId,
const void *inMsg, celix_properties_t *metadata) {
@@ -549,6 +563,9 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
void *payloadData = NULL;
size_t payloadLength = 0;
entry->protSer->encodePayload(entry->protSer->handle,
&message, &payloadData, &payloadLength);
+ if(payloadLength > 1000000) {
+ L_WARN("ERR LARGE PAYLOAD DETECTED\n");
+ }
void *metadataData = NULL;
size_t metadataLength = 0;
@@ -559,6 +576,10 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
message.metadata.metadata = NULL;
}
+ if(metadataLength > 1000000) {
+ L_WARN("ERR LARGE METADATA DETECTED\n");
+ }
+
message.header.msgId = msgTypeId;
message.header.msgMajorVersion = 0;
message.header.msgMinorVersion = 0;
@@ -581,8 +602,12 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
zmq_msg_t msg2; // Payload
zmq_msg_t msg3; // Metadata
void *socket = zsock_resolve(sender->zmq.socket);
+ psa_zmq_zerocopy_free_entry *freeMsgEntry =
malloc(sizeof(psa_zmq_zerocopy_free_entry));
+ freeMsgEntry->msgSer = entry->msgSer;
+ freeMsgEntry->serializedOutput = serializedOutput;
+ freeMsgEntry->serializedOutputLen = serializedOutputLen;
- zmq_msg_init_data(&msg1, headerData, headerLength,
psa_zmq_freeMsg, bound);
+ zmq_msg_init_data(&msg1, headerData, headerLength,
psa_zmq_freeMsg, NULL);
//send header
int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
if (rc == -1) {
@@ -592,7 +617,11 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
//send Payload
if (rc > 0) {
- zmq_msg_init_data(&msg2, payloadData, payloadLength,
psa_zmq_freeMsg, bound);
+ if(metadataLength > 0) {
+ zmq_msg_init_data(&msg2, payloadData,
payloadLength, psa_zmq_freeMsg, NULL);
+ } else {
+ zmq_msg_init_data(&msg2, payloadData,
payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ }
int flags = 0;
if (metadataLength > 0) {
flags = ZMQ_SNDMORE;
@@ -606,7 +635,7 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
//send MetaData
if (rc > 0 && metadataLength > 0) {
- zmq_msg_init_data(&msg3, metadataData, metadataLength,
psa_zmq_freeMsg, bound);
+ zmq_msg_init_data(&msg3, metadataData, metadataLength,
psa_zmq_freeMsg, freeMsgEntry);
rc = zmq_msg_send(&msg3, socket, 0);
if (rc == -1) {
L_WARN("Error sending metadata msg. %s",
strerror(errno));
@@ -629,16 +658,24 @@ static int psa_zmq_topicPublicationSend(void* handle,
unsigned int msgTypeId, co
zmsg_destroy(&msg); //if send was not ok, no owner
change -> destroy msg
}
- if (headerData) free(headerData);
+ if (headerData) {
+ free(headerData);
+ }
// Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData !=
message.payload.payload)) free(payloadData);
- if (metadataData) free(metadataData);
+ if (payloadData && (payloadData !=
message.payload.payload)) {
+ free(payloadData);
+ }
+ if (metadataData) {
+ free(metadataData);
+ }
}
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, metadata);
- if (message.metadata.metadata)
celix_properties_destroy(message.metadata.metadata);
- if (serializedOutput) {
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
+ }
+ if (!bound->parent->zeroCopyEnabled && serializedOutput) {
entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
serializedOutput, serializedOutputLen);
free(serializedOutput);
}
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index e9dad82..81c3200 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -117,7 +117,8 @@ void pubsubInterceptorsHandler_removeInterceptor(void
*handle, void *svc, __attr
for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
entry_t *entry = arrayList_get(handler->interceptors, i);
if (entry->interceptor == svc) {
- arrayList_remove(handler->interceptors, i);
+ void *old = arrayList_remove(handler->interceptors, i);
+ free(old);
break;
}
}
diff --git a/bundles/pubsub/test/CMakeLists.txt
b/bundles/pubsub/test/CMakeLists.txt
index bfc6f88..5081064 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -244,6 +244,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
target_include_directories(pubsub_zmq_zerocopy_tests SYSTEM PRIVATE
${CPPUTEST_INCLUDE_DIR} test)
#TODO fix issues with ZeroCopy and reanble test again
-# add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
-# SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_zerocopy_tests_cov
pubsub_zmq_zerocopy_tests
${CMAKE_BINARY_DIR}/coverage/pubsub_zmq_tests/pubsub_zmq_zerocopy_tests ..)
+ add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
+ SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_zerocopy_tests_cov
pubsub_zmq_zerocopy_tests
${CMAKE_BINARY_DIR}/coverage/pubsub_zmq_tests/pubsub_zmq_zerocopy_tests ..)
endif ()
diff --git a/bundles/pubsub/test/test/test_endpoint_runner.cc
b/bundles/pubsub/test/test/test_endpoint_runner.cc
index 0b7471c..e1daa6e 100644
--- a/bundles/pubsub/test/test/test_endpoint_runner.cc
+++ b/bundles/pubsub/test/test/test_endpoint_runner.cc
@@ -31,19 +31,19 @@ int main(int argc, char **argv) {
}
TEST_GROUP(PUBSUB_INT_GROUP) {
- celix_framework_t *fw = NULL;
- celix_bundle_context_t *ctx = NULL;
- void setup() {
+ celix_framework_t *fw = nullptr;
+ celix_bundle_context_t *ctx = nullptr;
+ void setup() override {
celixLauncher_launch("config.properties", &fw);
ctx = celix_framework_getFrameworkContext(fw);
}
- void teardown() {
+ void teardown() override {
celixLauncher_stop(fw);
celixLauncher_waitForShutdown(fw);
celixLauncher_destroy(fw);
- ctx = NULL;
- fw = NULL;
+ ctx = nullptr;
+ fw = nullptr;
}
};
diff --git a/libs/framework/src/service_tracker.c
b/libs/framework/src/service_tracker.c
index 900efef..20e9e60 100644
--- a/libs/framework/src/service_tracker.c
+++ b/libs/framework/src/service_tracker.c
@@ -248,16 +248,20 @@ celix_status_t serviceTracker_close(service_tracker_pt
tracker) {
if (instance != NULL) {
celixThreadRwlock_writeLock(&instance->lock);
unsigned int size = celix_arrayList_size(instance->trackedServices);
- celix_tracked_entry_t *trackedEntries[size];
- for (unsigned int i = 0u; i <
arrayList_size(instance->trackedServices); i++) {
- trackedEntries[i] = (celix_tracked_entry_t *)
arrayList_get(instance->trackedServices, i);
- }
- arrayList_clear(instance->trackedServices);
- celixThreadRwlock_unlock(&instance->lock);
+ if(size > 0) {
+ celix_tracked_entry_t *trackedEntries[size];
+ for (unsigned int i = 0u; i < size; i++) {
+ trackedEntries[i] = (celix_tracked_entry_t *)
arrayList_get(instance->trackedServices, i);
+ }
+ arrayList_clear(instance->trackedServices);
+ celixThreadRwlock_unlock(&instance->lock);
- //loop trough tracked entries an untrack
- for (unsigned int i = 0u; i < size; i++) {
- serviceTracker_untrackTracked(instance, trackedEntries[i]);
+ //loop trough tracked entries an untrack
+ for (unsigned int i = 0u; i < size; i++) {
+ serviceTracker_untrackTracked(instance, trackedEntries[i]);
+ }
+ } else {
+ celixThreadRwlock_unlock(&instance->lock);
}
celixThreadMutex_lock(&instance->closingLock);