This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 10117c98dd32861eca75dfc6f261d7f710f0957a Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Jun 27 16:40:02 2021 +0200 Adds initial PubSubInterceptorTestSuite --- .../gtest/PubSubInterceptorTestSuite.cc | 100 +++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc new file mode 100644 index 0000000..181eff3 --- /dev/null +++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <gtest/gtest.h> + +#include "pubsub_serializer_handler.h" +#include "celix/FrameworkFactory.h" +#include "msg.h" +#include "pubsub_interceptor.h" + +class PubSubInterceptorTestSuite : public ::testing::Test { +public: + PubSubInterceptorTestSuite() { + fw = celix::createFramework({ + {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} /*TODO memleak in pubsub zmq v2 when metadata is empty*/ + }); + ctx = fw->getFrameworkBundleContext(); + + EXPECT_GE(ctx->installBundle(PUBSUB_JSON_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_TOPMAN_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_ZMQ_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_WIRE_BUNDLE_FILE), 0); + } + + std::shared_ptr<celix::Framework> fw{}; + std::shared_ptr<celix::BundleContext> ctx{}; +}; + +static void serializeAndPrint(pubsub_serializer_handler_t* ser, uint32_t msgId, const void *msg) { + struct iovec* vec = nullptr; + size_t vecLen = 0; + pubsub_serializerHandler_serialize(ser, msgId, msg, &vec, &vecLen); + if (vecLen > 0) { + for (size_t i = 0; i < vecLen; ++i) { + fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout); + } + } + fputc('\n', stdout); + pubsub_serializerHandler_freeSerializedMsg(ser, msgId, vec, vecLen); +} + +std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<celix::BundleContext>& ctx) { + auto interceptor = std::shared_ptr<pubsub_interceptor>{new pubsub_interceptor{}, [](pubsub_interceptor* inter) { + auto* handler = (pubsub_serializer_handler_t*)inter->handle; + pubsub_serializerHandler_destroy(handler); + delete inter; + }}; + interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true); + interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t *) { + auto* ser = (pubsub_serializer_handler_t*)handle; + serializeAndPrint(ser, msgId, rawMsg); + EXPECT_STREQ(msgType, "msg"); + const auto *msg = static_cast<const msg_t*>(rawMsg); + EXPECT_GE(msg->seqNr, 0); + fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr); + }; + interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t *) { + auto* ser = (pubsub_serializer_handler_t*)handle; + serializeAndPrint(ser, msgId, rawMsg); + EXPECT_STREQ(msgType, "msg"); + const auto *msg = static_cast<const msg_t*>(rawMsg); + EXPECT_GE(msg->seqNr, 0); + fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr); + }; + //note registering identical services to validate multiple interceptors + return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build(); +} + +TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleReceivers) { + //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE) + //And a registered interceptor + //Then the interceptor receives a correct msg type. + + EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0); + + auto reg1 = createInterceptor(ctx); + auto reg2 = createInterceptor(ctx); + auto reg3 = createInterceptor(ctx); + + sleep(5); +}
