This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new b8e3c37  Bugfix/zmq fixes #191 (#199)
b8e3c37 is described below

commit b8e3c3763a0255b5d5e9c91fb85e282d2dad6269
Author: Michael de Lang <[email protected]>
AuthorDate: Thu Apr 16 14:11:37 2020 +0000

    Bugfix/zmq fixes #191 (#199)
    
    * Fix mem leaks
    * Fix use after free in zmq zerocopy
    
    * Fix issue with not unlocking mutex in some cases
    
    * Proper fix for freeing activator resources
    
    * PR feedback
    
    * re-enable zmq zerocopy tests
---
 .../interceptors/src/ps_interceptor_activator.c    | 28 +++++------
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 57 ++++++++++++++++++----
 .../pubsub_spi/src/pubsub_interceptors_handler.c   |  3 +-
 bundles/pubsub/test/CMakeLists.txt                 |  4 +-
 bundles/pubsub/test/test/test_endpoint_runner.cc   | 12 ++---
 libs/framework/src/service_tracker.c               | 22 +++++----
 6 files changed, 84 insertions(+), 42 deletions(-)

diff --git 
a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c 
b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
index 12a055c..0b5785c 100644
--- a/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
+++ b/bundles/pubsub/examples/pubsub/interceptors/src/ps_interceptor_activator.c
@@ -26,21 +26,22 @@
 struct interceptorActivator {
     first_interceptor_t *interceptor;
     uint64_t interceptorSvcId;
+    pubsub_interceptor_t interceptorSvc;
 
     second_interceptor_t *secondInterceptor;
     uint64_t secondInterceptorSvcId;
+    pubsub_interceptor_t secondInterceptorSvc;
 };
 
 static int interceptor_start(struct interceptorActivator *act, 
celix_bundle_context_t *ctx) {
-    pubsub_interceptor_t *interceptorSvc = calloc(1,sizeof(*interceptorSvc));
     first_interceptor_t *interceptor = NULL;
     firstInterceptor_create(&interceptor);
 
-    interceptorSvc->handle = interceptor;
-    interceptorSvc->preSend = firstInterceptor_preSend;
-    interceptorSvc->postSend = firstInterceptor_postSend;
-    interceptorSvc->preReceive = firstInterceptor_preReceive;
-    interceptorSvc->postReceive = firstInterceptor_postReceive;
+    act->interceptorSvc.handle = interceptor;
+    act->interceptorSvc.preSend = firstInterceptor_preSend;
+    act->interceptorSvc.postSend = firstInterceptor_postSend;
+    act->interceptorSvc.preReceive = firstInterceptor_preReceive;
+    act->interceptorSvc.postReceive = firstInterceptor_postReceive;
 
     act->interceptor = interceptor;
 
@@ -48,22 +49,21 @@ static int interceptor_start(struct interceptorActivator 
*act, celix_bundle_cont
     celix_properties_setLong(props, OSGI_FRAMEWORK_SERVICE_RANKING, 10);
 
     celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
-    opts.svc = interceptorSvc;
+    opts.svc = &act->interceptorSvc;
     opts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
     opts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
     opts.properties = props;
 
     act->interceptorSvcId = 
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
 
-    pubsub_interceptor_t *secondInterceptorSvc = calloc(1, 
sizeof(*secondInterceptorSvc));
     second_interceptor_t *secondInterceptor = NULL;
     secondInterceptor_create(&secondInterceptor);
 
-    secondInterceptorSvc->handle = secondInterceptor;
-    secondInterceptorSvc->preSend = secondInterceptor_preSend;
-    secondInterceptorSvc->postSend = secondInterceptor_postSend;
-    secondInterceptorSvc->preReceive = secondInterceptor_preReceive;
-    secondInterceptorSvc->postReceive = secondInterceptor_postReceive;
+    act->secondInterceptorSvc.handle = secondInterceptor;
+    act->secondInterceptorSvc.preSend = secondInterceptor_preSend;
+    act->secondInterceptorSvc.postSend = secondInterceptor_postSend;
+    act->secondInterceptorSvc.preReceive = secondInterceptor_preReceive;
+    act->secondInterceptorSvc.postReceive = secondInterceptor_postReceive;
 
     act->secondInterceptor = secondInterceptor;
 
@@ -71,7 +71,7 @@ static int interceptor_start(struct interceptorActivator 
*act, celix_bundle_cont
     celix_properties_setLong(secondProps, OSGI_FRAMEWORK_SERVICE_RANKING, 20);
 
     celix_service_registration_options_t secondOpts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
-    secondOpts.svc = secondInterceptorSvc;
+    secondOpts.svc = &act->secondInterceptorSvc;
     secondOpts.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME;
     secondOpts.serviceVersion = PUBSUB_INTERCEPTOR_SERVICE_VERSION;
     secondOpts.properties = secondProps;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c 
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 3accf5d..2d05724 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -111,6 +111,13 @@ typedef struct psa_zmq_bounded_service_entry {
     int getCount;
 } psa_zmq_bounded_service_entry_t;
 
+typedef struct psa_zmq_zerocopy_free_entry {
+    pubsub_msg_serializer_t *msgSer;
+    struct iovec *serializedOutput;
+    size_t serializedOutputLen;
+} psa_zmq_zerocopy_free_entry;
+
+
 static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t 
*requestingBundle, const celix_properties_t *svcProperties);
 static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t 
*requestingBundle, const celix_properties_t *svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -500,8 +507,15 @@ pubsub_admin_sender_metrics_t* 
pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
     return result;
 }
 
-static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
-    free(msg);
+static void psa_zmq_freeMsg(void *msg, void *hint) {
+    if(hint) {
+        psa_zmq_zerocopy_free_entry *entry = hint;
+        entry->msgSer->freeSerializeMsg(entry->msgSer->handle, 
entry->serializedOutput, entry->serializedOutputLen);
+        free(entry->serializedOutput);
+        free(entry);
+    } else {
+        free(msg);
+    }
 }
 
 static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *inMsg, celix_properties_t *metadata) {
@@ -549,6 +563,9 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
                 void *payloadData = NULL;
                 size_t payloadLength = 0;
                 entry->protSer->encodePayload(entry->protSer->handle, 
&message, &payloadData, &payloadLength);
+                if(payloadLength > 1000000) {
+                    L_WARN("ERR LARGE PAYLOAD DETECTED\n");
+                }
 
                 void *metadataData = NULL;
                 size_t metadataLength = 0;
@@ -559,6 +576,10 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
                     message.metadata.metadata = NULL;
                 }
 
+                if(metadataLength > 1000000) {
+                    L_WARN("ERR LARGE METADATA DETECTED\n");
+                }
+
                 message.header.msgId = msgTypeId;
                 message.header.msgMajorVersion = 0;
                 message.header.msgMinorVersion = 0;
@@ -581,8 +602,12 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
                     zmq_msg_t msg2; // Payload
                     zmq_msg_t msg3; // Metadata
                     void *socket = zsock_resolve(sender->zmq.socket);
+                    psa_zmq_zerocopy_free_entry *freeMsgEntry = 
malloc(sizeof(psa_zmq_zerocopy_free_entry));
+                    freeMsgEntry->msgSer = entry->msgSer;
+                    freeMsgEntry->serializedOutput = serializedOutput;
+                    freeMsgEntry->serializedOutputLen = serializedOutputLen;
 
-                    zmq_msg_init_data(&msg1, headerData, headerLength, 
psa_zmq_freeMsg, bound);
+                    zmq_msg_init_data(&msg1, headerData, headerLength, 
psa_zmq_freeMsg, NULL);
                     //send header
                     int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
                     if (rc == -1) {
@@ -592,7 +617,11 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
 
                     //send Payload
                     if (rc > 0) {
-                        zmq_msg_init_data(&msg2, payloadData, payloadLength, 
psa_zmq_freeMsg, bound);
+                        if(metadataLength > 0) {
+                            zmq_msg_init_data(&msg2, payloadData, 
payloadLength, psa_zmq_freeMsg, NULL);
+                        } else {
+                            zmq_msg_init_data(&msg2, payloadData, 
payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+                        }
                         int flags = 0;
                         if (metadataLength > 0) {
                             flags = ZMQ_SNDMORE;
@@ -606,7 +635,7 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
 
                     //send MetaData
                     if (rc > 0 && metadataLength > 0) {
-                        zmq_msg_init_data(&msg3, metadataData, metadataLength, 
psa_zmq_freeMsg, bound);
+                        zmq_msg_init_data(&msg3, metadataData, metadataLength, 
psa_zmq_freeMsg, freeMsgEntry);
                         rc = zmq_msg_send(&msg3, socket, 0);
                         if (rc == -1) {
                             L_WARN("Error sending metadata msg. %s", 
strerror(errno));
@@ -629,16 +658,24 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
                         zmsg_destroy(&msg); //if send was not ok, no owner 
change -> destroy msg
                     }
 
-                    if (headerData) free(headerData);
+                    if (headerData) {
+                        free(headerData);
+                    }
                     // Note: serialized Payload is deleted by serializer
-                    if (payloadData && (payloadData != 
message.payload.payload)) free(payloadData);
-                    if (metadataData) free(metadataData);
+                    if (payloadData && (payloadData != 
message.payload.payload)) {
+                        free(payloadData);
+                    }
+                    if (metadataData) {
+                        free(metadataData);
+                    }
                 }
 
                 
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, 
entry->msgSer->msgName, msgTypeId, inMsg, metadata);
 
-                if (message.metadata.metadata) 
celix_properties_destroy(message.metadata.metadata);
-                if (serializedOutput) {
+                if (message.metadata.metadata) {
+                    celix_properties_destroy(message.metadata.metadata);
+                }
+                if (!bound->parent->zeroCopyEnabled && serializedOutput) {
                     entry->msgSer->freeSerializeMsg(entry->msgSer->handle, 
serializedOutput, serializedOutputLen);
                     free(serializedOutput);
                 }
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c 
b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
index e9dad82..81c3200 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c
@@ -117,7 +117,8 @@ void pubsubInterceptorsHandler_removeInterceptor(void 
*handle, void *svc, __attr
     for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) {
         entry_t *entry = arrayList_get(handler->interceptors, i);
         if (entry->interceptor == svc) {
-            arrayList_remove(handler->interceptors, i);
+            void *old = arrayList_remove(handler->interceptors, i);
+            free(old);
             break;
         }
     }
diff --git a/bundles/pubsub/test/CMakeLists.txt 
b/bundles/pubsub/test/CMakeLists.txt
index bfc6f88..5081064 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -244,6 +244,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     target_include_directories(pubsub_zmq_zerocopy_tests SYSTEM PRIVATE 
${CPPUTEST_INCLUDE_DIR} test)
 
     #TODO fix issues with ZeroCopy and reanble test again
-#    add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests 
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
-#    SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_zerocopy_tests_cov 
pubsub_zmq_zerocopy_tests 
${CMAKE_BINARY_DIR}/coverage/pubsub_zmq_tests/pubsub_zmq_zerocopy_tests ..)
+    add_test(NAME pubsub_zmq_zerocopy_tests COMMAND pubsub_zmq_zerocopy_tests 
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_zmq_zerocopy_tests,CONTAINER_LOC>)
+    SETUP_TARGET_FOR_COVERAGE(pubsub_zmq_zerocopy_tests_cov 
pubsub_zmq_zerocopy_tests 
${CMAKE_BINARY_DIR}/coverage/pubsub_zmq_tests/pubsub_zmq_zerocopy_tests ..)
 endif ()
diff --git a/bundles/pubsub/test/test/test_endpoint_runner.cc 
b/bundles/pubsub/test/test/test_endpoint_runner.cc
index 0b7471c..e1daa6e 100644
--- a/bundles/pubsub/test/test/test_endpoint_runner.cc
+++ b/bundles/pubsub/test/test/test_endpoint_runner.cc
@@ -31,19 +31,19 @@ int main(int argc, char **argv) {
 }
 
 TEST_GROUP(PUBSUB_INT_GROUP) {
-    celix_framework_t *fw = NULL;
-    celix_bundle_context_t *ctx = NULL;
-    void setup() {
+    celix_framework_t *fw = nullptr;
+    celix_bundle_context_t *ctx = nullptr;
+    void setup() override {
         celixLauncher_launch("config.properties", &fw);
         ctx = celix_framework_getFrameworkContext(fw);
     }
 
-    void teardown() {
+    void teardown() override {
         celixLauncher_stop(fw);
         celixLauncher_waitForShutdown(fw);
         celixLauncher_destroy(fw);
-        ctx = NULL;
-        fw = NULL;
+        ctx = nullptr;
+        fw = nullptr;
     }
 };
 
diff --git a/libs/framework/src/service_tracker.c 
b/libs/framework/src/service_tracker.c
index 900efef..20e9e60 100644
--- a/libs/framework/src/service_tracker.c
+++ b/libs/framework/src/service_tracker.c
@@ -248,16 +248,20 @@ celix_status_t serviceTracker_close(service_tracker_pt 
tracker) {
     if (instance != NULL) {
         celixThreadRwlock_writeLock(&instance->lock);
         unsigned int size = celix_arrayList_size(instance->trackedServices);
-        celix_tracked_entry_t *trackedEntries[size];
-        for (unsigned int i = 0u; i < 
arrayList_size(instance->trackedServices); i++) {
-            trackedEntries[i] = (celix_tracked_entry_t *) 
arrayList_get(instance->trackedServices, i);
-        }
-        arrayList_clear(instance->trackedServices);
-        celixThreadRwlock_unlock(&instance->lock);
+        if(size > 0) {
+            celix_tracked_entry_t *trackedEntries[size];
+            for (unsigned int i = 0u; i < size; i++) {
+                trackedEntries[i] = (celix_tracked_entry_t *) 
arrayList_get(instance->trackedServices, i);
+            }
+            arrayList_clear(instance->trackedServices);
+            celixThreadRwlock_unlock(&instance->lock);
 
-        //loop trough tracked entries an untrack
-        for (unsigned int i = 0u; i < size; i++) {
-            serviceTracker_untrackTracked(instance, trackedEntries[i]);
+            //loop trough tracked entries an untrack
+            for (unsigned int i = 0u; i < size; i++) {
+                serviceTracker_untrackTracked(instance, trackedEntries[i]);
+            }
+        } else {
+            celixThreadRwlock_unlock(&instance->lock);
         }
 
         celixThreadMutex_lock(&instance->closingLock);

Reply via email to