This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/pubsub-interceptor-fix
by this push:
new 90497a9 Enables pubsub tcp interceptor tests and fixes mem leaks
90497a9 is described below
commit 90497a9037cf9e43e0702779da570a5adbcbbdd2
Author: Pepijn Noltes <[email protected]>
AuthorDate: Tue Jul 27 20:20:47 2021 +0200
Enables pubsub tcp interceptor tests and fixes mem leaks
---
bundles/pubsub/integration/CMakeLists.txt | 5 ++---
bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c | 9 ++++++++-
.../pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c | 5 +++++
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/bundles/pubsub/integration/CMakeLists.txt
b/bundles/pubsub/integration/CMakeLists.txt
index 59207a7..7ff5fe1 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -838,9 +838,8 @@ if (BUILD_PUBSUB_PSA_WS)
endif ()
if (BUILD_PUBSUB_PSA_TCP)
- message(STATUS "TODO enable tcp and interceptors. Currently has a memleak")
-
#add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration
Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
-
#add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration
Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
+
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration
Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
+
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration
Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
endif ()
if (BUILD_PUBSUB_PSA_ZMQ)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
index 278ebd3..eb02f79 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_handler.c
@@ -855,8 +855,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t
*handle, psa_tcp_connec
clock_gettime(CLOCK_REALTIME, &receiveTime);
bool releaseEntryBuffer = false;
handle->processMessageCallback(handle->processMessagePayload,
&entry->header, &releaseEntryBuffer, &receiveTime);
- if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle,
entry->fd, 0);
+ if (releaseEntryBuffer) {
+ pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+ }
}
+ celix_properties_destroy(entry->header.metadata.metadata);
+ entry->header.metadata.metadata = NULL;
}
static inline
@@ -1195,6 +1199,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message
if (payloadData && (payloadData != message->payload.payload)) {
free(payloadData);
}
+ if (metadataData && metadataSize > 0) {
+ free(metadataData);
+ }
}
celixThreadMutex_unlock(&entry->writeMutex);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index fe6fd53..6945975 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -487,6 +487,7 @@ static inline void processMsg(void* handle, const
pubsub_protocol_message_t *mes
if (status == CELIX_SUCCESS) {
celix_properties_t *metadata = message->metadata.metadata;
+ bool metadataWasNull = metadata == NULL;
bool cont =
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgFqn, message->header.msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
@@ -498,6 +499,10 @@ static inline void processMsg(void* handle, const
pubsub_protocol_message_t *mes
if (release) {
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
message->header.msgId, deSerializedMsg);
}
+ if (metadataWasNull) {
+ //note that if the metadata was created by the
pubsubInterceptorHandler_invokePreReceive, this needs to be deallocated
+ celix_properties_destroy(metadata);
+ }
} else {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for
scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);