This is an automated email from the ASF dual-hosted git repository.

xuzhenbao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/master by this push:
     new bfddc04d fix psa-tcp use-after-free(issue #405)
     new ff6e2961 Merge pull request #500 from xuzhenbao/psa_bug_fix
bfddc04d is described below

commit bfddc04d460e3e82c1b03e6f6bed777de66c441b
Author: xuzhenbao <[email protected]>
AuthorDate: Tue Mar 14 09:56:03 2023 +0800

    fix psa-tcp use-after-free(issue #405)
---
 .../src/pubsub_tcp_topic_receiver.c                | 101 ++++++++++++---------
 1 file changed, 59 insertions(+), 42 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 597e4ff4..a02560b7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -35,6 +35,8 @@
 #include <pubsub_utils.h>
 #include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
+#include "celix_string_hash_map.h"
+#include "celix_long_hash_map.h"
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN  37
@@ -74,14 +76,14 @@ struct pubsub_tcp_topic_receiver {
 
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = tcp url, value = 
psa_tcp_requested_connection_entry_t*
+        celix_string_hash_map_t *map; //key = tcp url, value = 
psa_tcp_requested_connection_entry_t*
         bool allConnected; //true if all requestedConnection are connected
     } requestedConnections;
 
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = long svc id, value = 
psa_tcp_subscriber_entry_t
+        celix_long_hash_map_t *map; //key = long svc id, value = 
psa_tcp_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -151,6 +153,17 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
     // property is in ms, timeout value in us. (convert ms to us).
     receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
                                                               
PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
+
+    celixThreadMutex_create(&receiver->thread.mutex, NULL);
+
+    //receiver->socketHandler depend on belows, we should initialize them 
first.
+    celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+    celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+    celix_string_hash_map_create_options_t reqConsMapOpts = 
CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
+    reqConsMapOpts.storeKeysWeakly = true;
+    receiver->requestedConnections.map = 
celix_stringHashMap_createWithOptions(&reqConsMapOpts);
+    receiver->subscribers.map = celix_longHashMap_create();
+
     /* When it's an endpoint share the socket with the sender */
     if (passiveKey != NULL) {
         celixThreadMutex_lock(&handlerStore->mutex);
@@ -190,12 +203,6 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
         pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, 
(unsigned int) retryCnt);
         pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, 
rcvTimeout);
     }
-    celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
-    celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
-    celixThreadMutex_create(&receiver->thread.mutex, NULL);
-
-    receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-    receiver->requestedConnections.map = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
 
     if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && 
(!receiver->isPassive)) {
         char *urlsCopy = celix_utils_strdup(staticConnectUrls);
@@ -207,7 +214,7 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
             entry->connected = false;
             entry->url = celix_utils_strdup(url);
             entry->parent = receiver;
-            hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+            celix_stringHashMap_put(receiver->requestedConnections.map, 
entry->url, entry);
             receiver->requestedConnections.allConnected = false;
         }
         free(urlsCopy);
@@ -238,6 +245,12 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
     }
 
     if (receiver->socketHandler == NULL) {
+        celix_longHashMap_destroy(receiver->subscribers.map);
+        celix_stringHashMap_destroy(receiver->requestedConnections.map);
+        celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        celixThreadMutex_destroy(&receiver->thread.mutex);
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -252,6 +265,8 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
 void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
     if (receiver != NULL) {
 
+        celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
+
         celixThreadMutex_lock(&receiver->thread.mutex);
         if (receiver->thread.running) {
             receiver->thread.running = false;
@@ -259,34 +274,36 @@ void 
pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
             celixThread_join(receiver->thread.thread, NULL);
         }
 
-        celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
+        pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, 
NULL);
+        
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, 
NULL, NULL);
+        if ((receiver->socketHandler) && (receiver->sharedSocketHandler == 
NULL)) {
+            pubsub_tcpHandler_destroy(receiver->socketHandler);
+            receiver->socketHandler = NULL;
+        }
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hashMap_destroy(receiver->subscribers.map, false, true);
+        CELIX_LONG_HASH_MAP_ITERATE(receiver->subscribers.map, iter) {
+            free(iter.value.ptrValue);
+        }
+        celix_longHashMap_destroy(receiver->subscribers.map);
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+
+        CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map, 
iter) {
+            psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
             if (entry != NULL) {
                 free(entry->url);
                 free(entry);
             }
         }
-        hashMap_destroy(receiver->requestedConnections.map, false, false);
+        celix_stringHashMap_destroy(receiver->requestedConnections.map);
         celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 
         celixThreadMutex_destroy(&receiver->subscribers.mutex);
         celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
         celixThreadMutex_destroy(&receiver->thread.mutex);
 
-        pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, 
NULL);
-        
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, 
NULL, NULL);
-        if ((receiver->socketHandler) && (receiver->sharedSocketHandler == 
NULL)) {
-            pubsub_tcpHandler_destroy(receiver->socketHandler);
-            receiver->socketHandler = NULL;
-        }
         pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
@@ -326,9 +343,8 @@ void 
pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
         }
         free(interface_url);
     } else {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map, 
iter) {
+            psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
             char *url = NULL;
             asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" 
: "");
             if (entry->connected) {
@@ -354,14 +370,14 @@ void pubsub_tcpTopicReceiver_connectTo(
             url);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
+    psa_tcp_requested_connection_entry_t *entry = 
celix_stringHashMap_get(receiver->requestedConnections.map, url);
     if (entry == NULL) {
         entry = calloc(1, sizeof(*entry));
         entry->url = celix_utils_strdup(url);
         entry->connected = false;
         entry->statically = false;
         entry->parent = receiver;
-        hashMap_put(receiver->requestedConnections.map, (void *) entry->url, 
entry);
+        celix_stringHashMap_put(receiver->requestedConnections.map, (void *) 
entry->url, entry);
         receiver->requestedConnections.allConnected = false;
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -376,13 +392,14 @@ void 
pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
             url);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_tcp_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, url);
+    psa_tcp_requested_connection_entry_t *entry = 
celix_stringHashMap_get(receiver->requestedConnections.map, url);
     if (entry != NULL) {
         int rc = pubsub_tcpHandler_disconnect(receiver->socketHandler, 
entry->url);
         if (rc < 0)
             L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, 
strerror(errno));
     }
     if (entry != NULL) {
+        celix_stringHashMap_remove(receiver->requestedConnections.map, url);
         free(entry->url);
         free(entry);
     }
@@ -413,7 +430,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void 
*handle, void *svc, const
     entry->initialized = false;
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    celix_longHashMap_put(receiver->subscribers.map, svcId, entry);
     receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
@@ -424,7 +441,8 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void 
*handle, void *svc __a
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_tcp_subscriber_entry_t *entry = 
hashMap_remove(receiver->subscribers.map, (void *)svcId);
+    psa_tcp_subscriber_entry_t *entry = 
celix_longHashMap_get(receiver->subscribers.map, svcId);
+    celix_longHashMap_remove(receiver->subscribers.map, svcId);
     free(entry);
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
@@ -432,12 +450,13 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void 
*handle, void *svc __a
 static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* 
msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, 
const celix_properties_t* metadata) {
     *release = true;
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+    celix_long_hash_map_iterator_t iter = 
celix_longHashMap_begin(receiver->subscribers.map);
+    while(!celix_longHashMapIterator_isEnd(&iter)) {
+        psa_tcp_subscriber_entry_t* entry = iter.value.ptrValue;
+        celix_longHashMapIterator_next(&iter);
         if (entry != NULL && entry->subscriberSvc->receive != NULL) {
             entry->subscriberSvc->receive(entry->subscriberSvc->handle, 
msgFqn, message->header.msgId, *msg, metadata, release);
-            if (!(*release) && hashMapIterator_hasNext(&iter)) { //receive 
function has taken ownership, deserialize again for new message
+            if (!(*release) && !celix_longHashMapIterator_isEnd(&iter)) { 
//receive function has taken ownership, deserialize again for new message
                 struct iovec deSerializeBuffer;
                 deSerializeBuffer.iov_base = message->payload.payload;
                 deSerializeBuffer.iov_len = message->payload.length;
@@ -578,9 +597,8 @@ static void 
psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
     if (!receiver->requestedConnections.allConnected) {
         bool allConnected = true;
-        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map, 
iter) {
+            psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
             if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
                 int rc = 
pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
                 if (rc < 0) {
@@ -601,13 +619,13 @@ static void psa_tcp_connectHandler(void *handle, const 
char *url, bool lock) {
             url);
     if (lock)
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
+    psa_tcp_requested_connection_entry_t *entry = 
celix_stringHashMap_get(receiver->requestedConnections.map, url);
     if (entry == NULL) {
         entry = calloc(1, sizeof(*entry));
         entry->parent = receiver;
         entry->url = celix_utils_strdup(url);
         entry->statically = true;
-        hashMap_put(receiver->requestedConnections.map, (void *) entry->url, 
entry);
+        celix_stringHashMap_put(receiver->requestedConnections.map, (void *) 
entry->url, entry);
         receiver->requestedConnections.allConnected = false;
     }
     entry->connected = true;
@@ -623,7 +641,7 @@ static void psa_tcp_disConnectHandler(void *handle, const 
char *url, bool lock)
             url);
     if (lock)
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
+    psa_tcp_requested_connection_entry_t *entry = 
celix_stringHashMap_get(receiver->requestedConnections.map, url);
     if (entry != NULL) {
         entry->connected = false;
         receiver->requestedConnections.allConnected = false;
@@ -636,9 +654,8 @@ static void 
psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     if (!receiver->subscribers.allInitialized) {
         bool allInitialized = true;
-        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        CELIX_LONG_HASH_MAP_ITERATE(receiver->subscribers.map, iter) {
+            psa_tcp_subscriber_entry_t *entry = iter.value.ptrValue;
             if (!entry->initialized) {
                 int rc = 0;
                 if (entry->subscriberSvc->init != NULL) {

Reply via email to