This is an automated email from the ASF dual-hosted git repository.
xuzhenbao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push:
new bfddc04d fix psa-tcp use-after-free(issue #405)
new ff6e2961 Merge pull request #500 from xuzhenbao/psa_bug_fix
bfddc04d is described below
commit bfddc04d460e3e82c1b03e6f6bed777de66c441b
Author: xuzhenbao <[email protected]>
AuthorDate: Tue Mar 14 09:56:03 2023 +0800
fix psa-tcp use-after-free(issue #405)
---
.../src/pubsub_tcp_topic_receiver.c | 101 ++++++++++++---------
1 file changed, 59 insertions(+), 42 deletions(-)
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 597e4ff4..a02560b7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -35,6 +35,8 @@
#include <pubsub_utils.h>
#include "pubsub_interceptors_handler.h"
#include <celix_api.h>
+#include "celix_string_hash_map.h"
+#include "celix_long_hash_map.h"
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -74,14 +76,14 @@ struct pubsub_tcp_topic_receiver {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = tcp url, value =
psa_tcp_requested_connection_entry_t*
+ celix_string_hash_map_t *map; //key = tcp url, value =
psa_tcp_requested_connection_entry_t*
bool allConnected; //true if all requestedConnection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = long svc id, value =
psa_tcp_subscriber_entry_t
+ celix_long_hash_map_t *map; //key = long svc id, value =
psa_tcp_subscriber_entry_t
bool allInitialized;
} subscribers;
};
@@ -151,6 +153,17 @@ pubsub_tcp_topic_receiver_t
*pubsub_tcpTopicReceiver_create(celix_bundle_context
// property is in ms, timeout value in us. (convert ms to us).
receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx,
PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
+
+ celixThreadMutex_create(&receiver->thread.mutex, NULL);
+
+ //receiver->socketHandler depend on belows, we should initialize them
first.
+ celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+ celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+ celix_string_hash_map_create_options_t reqConsMapOpts =
CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
+ reqConsMapOpts.storeKeysWeakly = true;
+ receiver->requestedConnections.map =
celix_stringHashMap_createWithOptions(&reqConsMapOpts);
+ receiver->subscribers.map = celix_longHashMap_create();
+
/* When it's an endpoint share the socket with the sender */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
@@ -190,12 +203,6 @@ pubsub_tcp_topic_receiver_t
*pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler,
(unsigned int) retryCnt);
pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler,
rcvTimeout);
}
- celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
- celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
- celixThreadMutex_create(&receiver->thread.mutex, NULL);
-
- receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
- receiver->requestedConnections.map = hashMap_create(utils_stringHash,
NULL, utils_stringEquals, NULL);
if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) &&
(!receiver->isPassive)) {
char *urlsCopy = celix_utils_strdup(staticConnectUrls);
@@ -207,7 +214,7 @@ pubsub_tcp_topic_receiver_t
*pubsub_tcpTopicReceiver_create(celix_bundle_context
entry->connected = false;
entry->url = celix_utils_strdup(url);
entry->parent = receiver;
- hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+ celix_stringHashMap_put(receiver->requestedConnections.map,
entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
free(urlsCopy);
@@ -238,6 +245,12 @@ pubsub_tcp_topic_receiver_t
*pubsub_tcpTopicReceiver_create(celix_bundle_context
}
if (receiver->socketHandler == NULL) {
+ celix_longHashMap_destroy(receiver->subscribers.map);
+ celix_stringHashMap_destroy(receiver->requestedConnections.map);
+ celixThreadMutex_destroy(&receiver->subscribers.mutex);
+ celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+ celixThreadMutex_destroy(&receiver->thread.mutex);
+ pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
}
@@ -252,6 +265,8 @@ pubsub_tcp_topic_receiver_t
*pubsub_tcpTopicReceiver_create(celix_bundle_context
void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
if (receiver != NULL) {
+ celix_bundleContext_stopTracker(receiver->ctx,
receiver->subscriberTrackerId);
+
celixThreadMutex_lock(&receiver->thread.mutex);
if (receiver->thread.running) {
receiver->thread.running = false;
@@ -259,34 +274,36 @@ void
pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
celixThread_join(receiver->thread.thread, NULL);
}
- celix_bundleContext_stopTracker(receiver->ctx,
receiver->subscriberTrackerId);
+ pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL,
NULL);
+
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL,
NULL, NULL);
+ if ((receiver->socketHandler) && (receiver->sharedSocketHandler ==
NULL)) {
+ pubsub_tcpHandler_destroy(receiver->socketHandler);
+ receiver->socketHandler = NULL;
+ }
celixThreadMutex_lock(&receiver->subscribers.mutex);
- hashMap_destroy(receiver->subscribers.map, false, true);
+ CELIX_LONG_HASH_MAP_ITERATE(receiver->subscribers.map, iter) {
+ free(iter.value.ptrValue);
+ }
+ celix_longHashMap_destroy(receiver->subscribers.map);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
+
+ CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map,
iter) {
+ psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
if (entry != NULL) {
free(entry->url);
free(entry);
}
}
- hashMap_destroy(receiver->requestedConnections.map, false, false);
+ celix_stringHashMap_destroy(receiver->requestedConnections.map);
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->subscribers.mutex);
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->thread.mutex);
- pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL,
NULL);
-
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL,
NULL, NULL);
- if ((receiver->socketHandler) && (receiver->sharedSocketHandler ==
NULL)) {
- pubsub_tcpHandler_destroy(receiver->socketHandler);
- receiver->socketHandler = NULL;
- }
pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
@@ -326,9 +343,8 @@ void
pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
}
free(interface_url);
} else {
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
+ CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map,
iter) {
+ psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
char *url = NULL;
asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)"
: "");
if (entry->connected) {
@@ -354,14 +370,14 @@ void pubsub_tcpTopicReceiver_connectTo(
url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, url);
+ psa_tcp_requested_connection_entry_t *entry =
celix_stringHashMap_get(receiver->requestedConnections.map, url);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->url = celix_utils_strdup(url);
entry->connected = false;
entry->statically = false;
entry->parent = receiver;
- hashMap_put(receiver->requestedConnections.map, (void *) entry->url,
entry);
+ celix_stringHashMap_put(receiver->requestedConnections.map, (void *)
entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -376,13 +392,14 @@ void
pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry =
hashMap_remove(receiver->requestedConnections.map, url);
+ psa_tcp_requested_connection_entry_t *entry =
celix_stringHashMap_get(receiver->requestedConnections.map, url);
if (entry != NULL) {
int rc = pubsub_tcpHandler_disconnect(receiver->socketHandler,
entry->url);
if (rc < 0)
L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url,
strerror(errno));
}
if (entry != NULL) {
+ celix_stringHashMap_remove(receiver->requestedConnections.map, url);
free(entry->url);
free(entry);
}
@@ -413,7 +430,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void
*handle, void *svc, const
entry->initialized = false;
celixThreadMutex_lock(&receiver->subscribers.mutex);
- hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+ celix_longHashMap_put(receiver->subscribers.map, svcId, entry);
receiver->subscribers.allInitialized = false;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
@@ -424,7 +441,8 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void
*handle, void *svc __a
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID,
-1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_tcp_subscriber_entry_t *entry =
hashMap_remove(receiver->subscribers.map, (void *)svcId);
+ psa_tcp_subscriber_entry_t *entry =
celix_longHashMap_get(receiver->subscribers.map, svcId);
+ celix_longHashMap_remove(receiver->subscribers.map, svcId);
free(entry);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
@@ -432,12 +450,13 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void
*handle, void *svc __a
static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char*
msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release,
const celix_properties_t* metadata) {
*release = true;
celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+ celix_long_hash_map_iterator_t iter =
celix_longHashMap_begin(receiver->subscribers.map);
+ while(!celix_longHashMapIterator_isEnd(&iter)) {
+ psa_tcp_subscriber_entry_t* entry = iter.value.ptrValue;
+ celix_longHashMapIterator_next(&iter);
if (entry != NULL && entry->subscriberSvc->receive != NULL) {
entry->subscriberSvc->receive(entry->subscriberSvc->handle,
msgFqn, message->header.msgId, *msg, metadata, release);
- if (!(*release) && hashMapIterator_hasNext(&iter)) { //receive
function has taken ownership, deserialize again for new message
+ if (!(*release) && !celix_longHashMapIterator_isEnd(&iter)) {
//receive function has taken ownership, deserialize again for new message
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
@@ -578,9 +597,8 @@ static void
psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (!receiver->requestedConnections.allConnected) {
bool allConnected = true;
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
+ CELIX_STRING_HASH_MAP_ITERATE(receiver->requestedConnections.map,
iter) {
+ psa_tcp_requested_connection_entry_t *entry = iter.value.ptrValue;
if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
int rc =
pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
@@ -601,13 +619,13 @@ static void psa_tcp_connectHandler(void *handle, const
char *url, bool lock) {
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, url);
+ psa_tcp_requested_connection_entry_t *entry =
celix_stringHashMap_get(receiver->requestedConnections.map, url);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->parent = receiver;
entry->url = celix_utils_strdup(url);
entry->statically = true;
- hashMap_put(receiver->requestedConnections.map, (void *) entry->url,
entry);
+ celix_stringHashMap_put(receiver->requestedConnections.map, (void *)
entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
entry->connected = true;
@@ -623,7 +641,7 @@ static void psa_tcp_disConnectHandler(void *handle, const
char *url, bool lock)
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, url);
+ psa_tcp_requested_connection_entry_t *entry =
celix_stringHashMap_get(receiver->requestedConnections.map, url);
if (entry != NULL) {
entry->connected = false;
receiver->requestedConnections.allConnected = false;
@@ -636,9 +654,8 @@ static void
psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv
celixThreadMutex_lock(&receiver->subscribers.mutex);
if (!receiver->subscribers.allInitialized) {
bool allInitialized = true;
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
+ CELIX_LONG_HASH_MAP_ITERATE(receiver->subscribers.map, iter) {
+ psa_tcp_subscriber_entry_t *entry = iter.value.ptrValue;
if (!entry->initialized) {
int rc = 0;
if (entry->subscriberSvc->init != NULL) {