Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544253608



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t 
*receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = 
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, 
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = 
hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more 
receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, 
&deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type 
%s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = 
hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, 
msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, 
msgType, msgId, deSerializedMsg, metadata);

Review comment:
       @pnoltes the zmq admin also calls the post receive in this while loop. 
But the svc that has received the message is not passed to the interceptor, so 
what is the use?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to