This is an automated email from the ASF dual-hosted git repository.
erjanaltena pushed a commit to branch nanomsg
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/nanomsg by this push:
new b53498a Nanomsg: added LogHelper class
b53498a is described below
commit b53498a8c632d0857da1ab059fe4bfc87f26e9ba
Author: erjan altena <[email protected]>
AuthorDate: Sun Dec 23 11:23:46 2018 +0100
Nanomsg: added LogHelper class
---
.../device_access/device_access/src/activator.c | 0
.../pubsub/pubsub_admin_nanomsg/src/LogHelper.h | 112 +++++++++++++++++++++
.../src/psa_nanomsg_activator.cc | 43 +-------
.../src/pubsub_nanomsg_admin.cc | 75 ++++++--------
.../src/pubsub_nanomsg_admin.h | 16 +--
.../src/pubsub_nanomsg_common.h | 3 +-
.../src/pubsub_nanomsg_topic_receiver.cc | 62 +++++-------
.../src/pubsub_nanomsg_topic_receiver.h | 6 +-
.../src/pubsub_nanomsg_topic_sender.cc | 73 +++-----------
.../src/pubsub_nanomsg_topic_sender.h | 9 +-
doap/doap_Celix.rdf | 0
11 files changed, 204 insertions(+), 195 deletions(-)
diff --git a/bundles/device_access/device_access/src/activator.c
b/bundles/device_access/device_access/src/activator.c
old mode 100755
new mode 100644
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/LogHelper.h
b/bundles/pubsub/pubsub_admin_nanomsg/src/LogHelper.h
new file mode 100644
index 0000000..d14ebc3
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/LogHelper.h
@@ -0,0 +1,112 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#pragma once
+#include <sstream>
+#include "log_helper.h"
+
+namespace celix {
+ namespace pubsub {
+ namespace nanomsg {
+
+// template<typename... Args>
+// void L_DEBUG(log_helper_t *logHelper, Args... args) {
+// std::stringstream ss = LOG_STREAM(args...);
+// logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG,
ss.str().c_str());
+// }
+//
+// template<typename... Args>
+// void L_INFO(log_helper_t *logHelper, Args... args) {
+// auto ss = LOG_STREAM(args...);
+// logHelper_log(logHelper, OSGI_LOGSERVICE_INFO,
ss.str().c_str());
+// }
+//
+// template<typename... Args>
+// void L_WARN(log_helper_t *logHelper, Args... args) {
+// auto ss = LOG_STREAM(args...);
+// logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING,
ss.str().c_str());
+// }
+//
+// template<typename... Args>
+// void L_ERROR(log_helper_t *logHelper, Args... args) {
+// auto ss = LOG_STREAM(args...);
+// logHelper_log((log_helper_pt) logHelper,
OSGI_LOGSERVICE_ERROR, ss.str().c_str());
+// }
+
+ class LogHelper {
+ public:
+ LogHelper(log_helper_t *lh) : _logHelper{lh} {
+ }
+
+ LogHelper(const LogHelper& ) = default;
+ LogHelper& operator=(const LogHelper&) = default;
+
+ LogHelper(bundle_context_pt ctx) : helperCreated{true} {
+ logHelper_create(ctx, &_logHelper);
+ logHelper_start(_logHelper);
+ }
+
+ ~LogHelper() {
+ if (helperCreated) {
+ logHelper_stop(_logHelper);
+ logHelper_destroy(&_logHelper);
+ }
+ }
+ template<typename... Args>
+ void ERROR(Args... args) {
+ auto ss = LOG_STREAM(args...);
+ logHelper_log(_logHelper, OSGI_LOGSERVICE_ERROR,
ss.str().c_str());
+ }
+
+ template<typename... Args>
+ void WARN(Args... args) {
+ auto ss = LOG_STREAM(args...);
+ logHelper_log(_logHelper, OSGI_LOGSERVICE_WARNING,
ss.str().c_str());
+ }
+
+ template<typename... Args>
+ void INFO(Args... args) {
+ auto ss = LOG_STREAM(args...);
+ logHelper_log(_logHelper, OSGI_LOGSERVICE_INFO,
ss.str().c_str());
+ }
+
+ template<typename... Args>
+ void DBG(Args... args) {
+ auto ss = LOG_STREAM(args...);
+ logHelper_log(_logHelper, OSGI_LOGSERVICE_DEBUG,
ss.str().c_str());
+ }
+
+ private:
+ bool helperCreated{false};
+ log_helper_t *_logHelper{};
+
+ template<typename T>
+ std::stringstream LOG_STREAM(T first) {
+ std::stringstream ss;
+ ss << first;
+ return ss;
+ }
+
+ template<typename T, typename... Args>
+ std::stringstream LOG_STREAM(T first, Args... args) {
+ std::stringstream ss;
+ ss << first << LOG_STREAM(args...).str();
+ return ss;
+ }
+
+ };
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index e599f01..f249a06 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -23,45 +23,14 @@
#include <iostream>
#include "celix_api.h"
#include "pubsub_serializer.h"
-#include "log_helper.h"
-
+#include "LogHelper.h"
#include "pubsub_admin.h"
#include "pubsub_nanomsg_admin.h"
-class LogHelper {
-public:
- LogHelper(celix_bundle_context_t *ctx) : context{ctx} {
- if (logHelper_create(context, &logHelper)!= CELIX_SUCCESS) {
- std::bad_alloc{};
- }
-
- }
- ~LogHelper() {
- logHelper_destroy(&logHelper);
- }
-
- LogHelper(const LogHelper &) = delete;
- LogHelper & operator=(const LogHelper&) = delete;
- celix_status_t start () {
- return logHelper_start(logHelper);
- }
-
- celix_status_t stop () {
- return logHelper_stop(logHelper);
- }
-
- log_helper_t *get() {
- return logHelper;
- }
-private:
- celix_bundle_context_t *context;
- log_helper_t *logHelper{};
-
-};
class psa_nanomsg_activator {
public:
- psa_nanomsg_activator(celix_bundle_context_t *ctx) : context{ctx},
logHelper{context}, admin(context, logHelper.get()) {
+ psa_nanomsg_activator(celix_bundle_context_t *ctx) : context{ctx},
L{context}, admin(context, L) {
}
psa_nanomsg_activator(const psa_nanomsg_activator&) = delete;
psa_nanomsg_activator& operator=(const psa_nanomsg_activator&) = delete;
@@ -72,19 +41,17 @@ public:
celix_status_t start() {
admin.start();
- auto status = logHelper.start();
-
- return status;
+ return CELIX_SUCCESS;
}
celix_status_t stop() {
admin.stop();
- return logHelper.stop();
+ return CELIX_SUCCESS;
};
private:
celix_bundle_context_t *context{};
- LogHelper logHelper;
+ celix::pubsub::nanomsg::LogHelper L;
pubsub_nanomsg_admin admin;
};
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index ba47723..1a01ebf 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -34,22 +34,12 @@
#include "pubsub_nanomsg_admin.h"
#include "pubsub_psa_nanomsg_constants.h"
-#define L_DEBUG(...) \
- logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-#define L_INFO(...) \
- logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-#define L_WARN(...) \
- logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-#define L_ERROR(...) \
- logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
-
-
static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
-pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx,
log_helper_t *logHelper):
+pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx,
celix::pubsub::nanomsg::LogHelper& logHelper):
ctx{_ctx},
- log{logHelper} {
+ L{logHelper} {
verbose = celix_bundleContext_getPropertyAsBool(ctx,
PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
fwUUID = celix_bundleContext_getProperty(ctx,
OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr);
@@ -70,13 +60,13 @@
pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
}
if (ip == nullptr) {
- L_WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using
default ip (%s)", PUBSUB_NANOMSG_DEFAULT_IP);
+ L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using
default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")");
ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
}
ipAddress = ip;
if (verbose) {
- L_INFO("[PSA_NANOMSG] Using %s for service annunciation", ip);
+ L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation.");
}
@@ -85,7 +75,7 @@
pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
basePort = (unsigned int)_basePort;
maxPort = (unsigned int)_maxPort;
if (verbose) {
- L_INFO("[PSA_NANOMSG] Using base till max port: %li till %li",
_basePort, _maxPort);
+ L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till
", _maxPort);
}
@@ -214,7 +204,7 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc,
const celix_properties_t
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID,
-1L);
if (serType == nullptr) {
- L_INFO("[PSA_NANOMSG] Ignoring serializer service without %s
property", PUBSUB_SERIALIZER_TYPE_KEY);
+ L.INFO("[PSA_NANOMSG] Ignoring serializer service without ",
PUBSUB_SERIALIZER_TYPE_KEY, " property");
return;
}
@@ -273,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void
*/*svc*/, const celix_proper
celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId,
const celix_filter_t *svcFilter,
double *outScore, long
*outSerializerSvcId) {
- L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
+ L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId,
svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
@@ -285,7 +275,7 @@ celix_status_t pubsub_nanomsg_admin::matchPublisher(long
svcRequesterBndId, cons
celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId,
const celix_properties_t
*svcProperties, double *outScore,
long *outSerializerSvcId) {
- L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
+ L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId,
svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId);
@@ -296,7 +286,7 @@ celix_status_t pubsub_nanomsg_admin::matchSubscriber(long
svcProviderBndId,
}
celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t
*endpoint, bool *outMatch) {
- L_DEBUG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
+ L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
bool match = pubsub_utils_matchEndpoint(ctx, endpoint,
PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
if (outMatch != nullptr) {
@@ -314,45 +304,37 @@ celix_status_t
pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
//3) Connect existing endpoints
//4) set outPublisherEndpoint
- celix_properties_t *newEndpoint = nullptr;
-
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
auto sender = topicSenders.map.find(key);
if (sender == topicSenders.map.end()) {
- psa_nanomsg_serializer_entry_t *serEntry = nullptr;
+ //psa_nanomsg_serializer_entry *serEntry = nullptr;
auto kv = serializers.map.find(serializerSvcId);
if (kv != serializers.map.end()) {
- serEntry = &kv->second;
- }
- if (serEntry != nullptr) {
+ auto &serEntry = kv->second;
auto e = topicSenders.map.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
- std::forward_as_tuple(ctx, log, scope, topic,
serializerSvcId, serEntry->svc, ipAddress,
+ std::forward_as_tuple(ctx, L, scope, topic,
serializerSvcId, serEntry.svc, ipAddress,
basePort, maxPort));
- const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
- const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
- nullptr);
+ celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID,
scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
+ PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr);
celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY,
e.first->second.getUrl().c_str());
//if available also set container name
const char *cn = celix_bundleContext_getProperty(ctx,
"CELIX_CONTAINER_NAME", nullptr);
if (cn != nullptr) {
celix_properties_set(newEndpoint, "container_name", cn);
}
+ if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
+ *outPublisherEndpoint = newEndpoint;
+ }
} else {
- L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
- free(key);
+ L.ERROR("[PSA NANOMSG] Error creating a TopicSender");
}
} else {
- free(key);
- L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for
scope/topic %s/%s!", scope, topic);
- }
-
- if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
- *outPublisherEndpoint = newEndpoint;
+ L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for
scope/topic ", scope,"/", topic);
}
+ free(key);
return status;
}
@@ -365,9 +347,8 @@ celix_status_t
pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
- ;
if (topicSenders.map.erase(key) == 0) {
- L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic
%s/%s. Does not exists", scope, topic);
+ L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic ",
scope, "/", topic, " Does not exists");
}
free(key);
@@ -379,7 +360,7 @@ celix_status_t
pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
celix_properties_t *newEndpoint = nullptr;
- std::string key = pubsubEndpoint_createScopeTopicKey(scope.c_str(),
topic.c_str());
+ auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(),
topic.c_str());
pubsub::nanomsg::topic_receiver * receiver = nullptr;
{
std::lock_guard<std::mutex> serializerLock(serializers.mutex);
@@ -392,9 +373,9 @@ celix_status_t
pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
auto kvs = serializers.map.find(serializerSvcId);
if (kvs != serializers.map.end()) {
auto serEntry = kvs->second;
- receiver = new pubsub::nanomsg::topic_receiver(ctx, log,
scope, topic, serializerSvcId, serEntry.svc);
+ receiver = new pubsub::nanomsg::topic_receiver(ctx, L, scope,
topic, serializerSvcId, serEntry.svc);
} else {
- L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender
%s/%s", scope.c_str(), topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender
", scope, "/", topic);
}
if (receiver != nullptr) {
const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
@@ -408,10 +389,10 @@ celix_status_t
pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
}
topicReceivers.map[key] = receiver;
} else {
- L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+ L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
}
} else {
- L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver
for scope/topic %s/%s!", scope.c_str(), topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver
for scope/topic ", scope, "/", topic);
}
}
if (receiver != nullptr && newEndpoint != nullptr) {
@@ -428,7 +409,7 @@ celix_status_t
pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope
if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
*outSubscriberEndpoint = newEndpoint;
}
-
+ free(key);
celix_status_t status = CELIX_SUCCESS;
return status;
}
@@ -509,7 +490,7 @@ celix_status_t
pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nano
const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY,
nullptr);
if (url == nullptr) {
- L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
+ L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
if ((eScope == scope) && (eTopic == topic)) {
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 8ed5ddd..1ac7fea 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -24,11 +24,10 @@
#include <map>
#include <pubsub_admin.h>
#include "celix_api.h"
-#include "log_helper.h"
#include "pubsub_nanomsg_topic_receiver.h"
#include <pubsub_serializer.h>
-
-#include "../../../shell/shell/include/command.h"
+#include "LogHelper.h"
+#include "command.h"
#include "pubsub_nanomsg_topic_sender.h"
#include "pubsub_nanomsg_topic_receiver.h"
@@ -51,7 +50,7 @@ struct ProtectedMap {
class pubsub_nanomsg_admin {
public:
- pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
+ pubsub_nanomsg_admin(celix_bundle_context_t *ctx,
celix::pubsub::nanomsg::LogHelper& logHelper);
pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
~pubsub_nanomsg_admin();
@@ -88,7 +87,7 @@ private:
celix_status_t
disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
const
celix_properties_t *endpoint);
celix_bundle_context_t *ctx;
- log_helper_t *log;
+ celix::pubsub::nanomsg::LogHelper L;
pubsub_admin_service_t adminService{};
long adminSvcId = -1L;
long cmdSvcId = -1L;
@@ -108,7 +107,8 @@ private:
bool verbose{};
- typedef struct psa_nanomsg_serializer_entry {
+ class psa_nanomsg_serializer_entry {
+ public:
psa_nanomsg_serializer_entry(const char*_serType, long _svcId,
pubsub_serializer_service_t *_svc) :
serType{_serType}, svcId{_svcId}, svc{_svc} {
@@ -117,8 +117,8 @@ private:
const char *serType;
long svcId;
pubsub_serializer_service_t *svc;
- } psa_nanomsg_serializer_entry_t;
- ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
+ };
+ ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{};
ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender>
topicSenders{};
ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*>
topicReceivers{};
ProtectedMap<const std::string, celix_properties_t *>
discoveredEndpoints{};
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 2c03d4c..b3c8d25 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -21,11 +21,12 @@
#define CELIX_PUBSUB_ZMQ_COMMON_H
#include <string>
+#include <sstream>
#include <utils.h>
#include "version.h"
#include "pubsub_common.h"
-
+#include "log_helper.h"
/*
* NOTE zmq is used by first sending three frames:
diff --git
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 83d1caf..4df5aae 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -37,7 +37,7 @@
#include <pubsub/subscriber.h>
#include <pubsub_constants.h>
#include <pubsub_endpoint.h>
-#include <log_helper.h>
+#include <LogHelper.h>
#include "pubsub_nanomsg_topic_receiver.h"
#include "pubsub_psa_nanomsg_constants.h"
@@ -45,7 +45,7 @@
#include "pubsub_topology_manager.h"
//TODO see if block and wakeup (reset) also works
-#define PSA_NANOMSG_RECV_TIMEOUT 1000
+#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout
/*
#define L_DEBUG(...) \
@@ -57,31 +57,30 @@
#define L_ERROR(...) \
logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
*/
-#define L_DEBUG printf
-#define L_INFO printf
-#define L_WARN printf
-#define L_ERROR printf
+//#define L_DEBUG printf
+//#define L_INFO printf
+//#define L_WARN printf
+//#define L_ERROR printf
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
- log_helper_t *_logHelper,
+ celix::pubsub::nanomsg::LogHelper& _logHelper,
const std::string &_scope,
const std::string &_topic,
long _serializerSvcId,
- pubsub_serializer_service_t *_serializer) :
m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
+ pubsub_serializer_service_t *_serializer) : L{_logHelper},
m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
ctx = _ctx;
- logHelper = _logHelper;
serializer = _serializer;
m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
if (m_nanoMsgSocket < 0) {
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s",
m_scope.c_str(), m_topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: ",
m_scope.c_str(), "/", m_topic.c_str());
std::bad_alloc{};
} else {
int timeout = PSA_NANOMSG_RECV_TIMEOUT;
if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO,
&timeout,
sizeof (timeout)) < 0) {
- L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set
sockopt RECV_TIMEO failed", m_scope.c_str(), m_topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope,
"/",m_topic, ", set sockopt RECV_TIMEO failed");
std::bad_alloc{};
}
@@ -91,7 +90,7 @@
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
subscriberTrackerId =
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
recvThread.running = true;
-
+ free ((void*)opts.filter.filter);
recvThread.thread = std::thread([this]() {this->recvThread_exec();});
}
}
@@ -161,7 +160,7 @@ void
pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &
void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
- L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s",
m_scope.c_str(), m_topic.c_str(), url);
+ L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting
to nanomsg url ", url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -178,13 +177,13 @@ void pubsub::nanomsg::topic_receiver::connectTo(const
char *url) {
entry->second.setConnected(true);
entry->second.setId(connection_id);
} else {
- L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)",
url, strerror(errno));
+ L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, "
(",strerror(errno), ")");
}
}
}
void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
- L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url
%s", m_scope.c_str(), m_topic.c_str(), url);
+ L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect
from nanomsg url ", url);
std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
auto entry = requestedConnections.map.find(url);
@@ -193,8 +192,7 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const
char *url) {
if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
entry->second.setConnected(false);
} else {
- L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s,
id %d. (%s)", url, entry->second.getId(),
- strerror(errno));
+ L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ",
url, ", id: ", entry->second.getId(), " (",strerror(errno),")");
}
}
requestedConnections.map.erase(url);
@@ -226,7 +224,7 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void
*svc, const celix_prope
int rc = serializer->createSerializerMap(serializer->handle,
(celix_bundle_t*)bnd, &entry->second.msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for
TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for
TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str());
subscribers.map.erase(bndId);
}
}
@@ -244,7 +242,7 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void
*/*svc*/,
//remove entry
int rc = serializer->destroySerializerMap(serializer->handle,
entry->second.msgTypes);
if (rc != 0) {
- L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for
TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str());
+ L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for
TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n");
}
subscribers.map.erase(bndId);
}
@@ -267,23 +265,18 @@ void
pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
} else {
- //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for
scope/topic %s/%s", msgSer->msgName, scope, topic);
+ L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ",
msgSer->msgName , "for scope/topic ", scope(), "/", topic());
}
}
} else {
- L_WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id %i",
hdr->type);
+ L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ",
hdr->type);
}
}
void pubsub::nanomsg::topic_receiver::processMsg(const
pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) {
std::lock_guard<std::mutex> _lock(subscribers.mutex);
- //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map);
- //while (hashMapIterator_hasNext(&iter)) {
for (auto entry : subscribers.map) {
- //psa_nanomsg_subscriber_entry_t *entry =
static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter));
- //if (entry != NULL) {
- processMsgForSubscriberEntry(&entry.second, hdr, payload,
payloadSize);
- //}
+ processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize);
}
}
@@ -293,12 +286,7 @@ struct Message {
};
void pubsub::nanomsg::topic_receiver::recvThread_exec() {
- bool running{};
- {
- std::lock_guard<std::mutex> _lock(recvThread.mutex);
- running = recvThread.running;
- }
- while (running) {
+ while (recvThread.running) {
Message *msg = nullptr;
nn_iovec iov[2];
iov[0].iov_base = &msg;
@@ -319,13 +307,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() {
processMsg(&msg->header, msg->payload,
recvBytes-sizeof(msg->header));
nn_freemsg(msg);
} else if (recvBytes >= 0) {
- L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d)
smaller than header\n", recvBytes);
+ L.ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (",
recvBytes,") smaller than header\n");
} else if (errno == EAGAIN || errno == ETIMEDOUT) {
- //nop
+ // no data: go to next cycle
} else if (errno == EINTR) {
- L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
+ L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
} else {
- L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d:
%s\n", errno, strerror(errno));
+ L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ",
errno, " : ", strerror(errno), "\n");
}
} // while
diff --git
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 6b0950a..b1ac457 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -23,7 +23,7 @@
#include <mutex>
#include <map>
#include "pubsub_serializer.h"
-#include "log_helper.h"
+#include "LogHelper.h"
#include "celix_bundle_context.h"
#include "pubsub_nanomsg_common.h"
#include "pubsub/subscriber.h"
@@ -72,7 +72,7 @@ namespace pubsub {
public:
topic_receiver(celix_bundle_context_t
*ctx,
- log_helper_t *logHelper,
+ celix::pubsub::nanomsg::LogHelper& logHelper,
const std::string &scope,
const std::string &topic,
long serializerSvcId, pubsub_serializer_service_t
@@ -96,7 +96,7 @@ namespace pubsub {
private:
celix_bundle_context_t *ctx{nullptr};
- log_helper_t *logHelper{nullptr};
+ celix::pubsub::nanomsg::LogHelper L;
long m_serializerSvcId{0};
pubsub_serializer_service_t *serializer{nullptr};
const std::string m_scope{};
diff --git
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 7521d78..b03881a 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -24,7 +24,7 @@
#include <utils.h>
#include <arpa/inet.h>
#include <zconf.h>
-
+#include <LogHelper.h>
#include <nanomsg/nn.h>
#include <nanomsg/bus.h>
@@ -38,49 +38,11 @@
#define FIRST_SEND_DELAY_IN_SECONDS 2
#define NANOMSG_BIND_MAX_RETRY 10
-template <typename T>
-std::stringstream LOG_STREAM(T first) {
- std::stringstream ss;
- ss << first;
- return ss;
-}
-
-template <typename T, typename... Args>
-std::stringstream LOG_STREAM(T first, Args... args) {
- std::stringstream ss;
- ss << first << LOG_STREAM(args...).str();
- return ss;
-}
-
-template <typename... Args>
-void L_DEBUG(log_helper_t *logHelper, Args... args) {
- std::stringstream ss = LOG_STREAM(args...);
- logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, ss.str().c_str());
-}
-
-template <typename... Args>
-void L_INFO(log_helper_t *logHelper, Args... args) {
- auto ss = LOG_STREAM(args...);
- logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, ss.str().c_str());
-}
-
-template <typename... Args>
-void L_WARN(log_helper_t *logHelper, Args... args) {
- auto ss = LOG_STREAM(args...);
- logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, ss.str().c_str());
-}
-
-template <typename... Args>
-void L_ERROR(log_helper_t *logHelper, Args... args) {
- auto ss = LOG_STREAM(args...);
- logHelper_log((log_helper_pt)logHelper, OSGI_LOGSERVICE_ERROR,
ss.str().c_str());
-}
-
static unsigned int rand_range(unsigned int min, unsigned int max);
-static void delay_first_send_for_late_joiners(log_helper_t* logHelper);
+static void
delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper);
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t
*_ctx,
- log_helper_t
*_logHelper,
+
celix::pubsub::nanomsg::LogHelper& _logHelper,
const char *_scope,
const char *_topic,
long _serializerSvcId,
@@ -89,7 +51,7 @@
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
unsigned int
_basePort,
unsigned int
_maxPort) :
ctx{_ctx},
- logHelper{_logHelper},
+ L{_logHelper},
serializerSvcId {_serializerSvcId},
serializer{_ser},
scope{_scope},
@@ -194,7 +156,7 @@ void*
pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce
} else {
auto entry = boundedServices.map.emplace(std::piecewise_construct,
std::forward_as_tuple(bndId),
- std::forward_as_tuple(scope, topic, bndId,
nanomsg.socket, logHelper));
+ std::forward_as_tuple(scope, topic, bndId,
nanomsg.socket, L));
int rc = serializer->createSerializerMap(serializer->handle,
(celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
if (rc == 0) {
@@ -207,10 +169,7 @@ void*
pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce
service = &entry.first->second.service;
} else {
boundedServices.map.erase(bndId);
- auto x = LOG_STREAM(12, "hallo");
- logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, x.str().c_str());
- log_helper_pt lh = logHelper;
- L_ERROR(lh, "Error creating serializer map for NanoMsg TopicSender
", scope, topic);
+ L.ERROR("Error creating serializer map for NanoMsg TopicSender.
Scope: ", scope, ", Topic: ", topic);
}
}
@@ -228,7 +187,7 @@ void
pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const c
if (entry->second.getCount == 0) {
int rc = serializer->destroySerializerMap(serializer->handle,
entry->second.msgTypes);
if (rc != 0) {
- L_ERROR(logHelper, "Error destroying publisher service,
serializer not available / cannot get msg serializer map\n");
+ L.ERROR("Error destroying publisher service, serializer not
available / cannot get msg serializer map\n");
}
boundedServices.map.erase(bndId);
}
@@ -240,11 +199,11 @@ int
pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int ms
auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes,
(void*)(uintptr_t)msgTypeId));
if (msgSer != nullptr) {
- delay_first_send_for_late_joiners(logHelper);
+ delay_first_send_for_late_joiners(L);
int major = 0, minor = 0;
- pubsub_nanmosg_msg_header_t msg_hdr;// = calloc(1, sizeof(*msg_hdr));
+ pubsub_nanmosg_msg_header_t msg_hdr{};// = calloc(1, sizeof(*msg_hdr));
msg_hdr.type = msgTypeId;
if (msgSer->msgVersion != nullptr) {
@@ -273,31 +232,31 @@ int
pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int ms
int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 );
free(serializedOutput);
if (rc < 0) {
- L_WARN(logHelper, "[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc,
", error: ", strerror(errno));
+ L.WARN("[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, ", error:
", strerror(errno));
} else {
- L_INFO(logHelper, "[PSA_ZMQ_TS] Send message with size ", rc,
"\n");
- L_INFO(logHelper, "[PSA_ZMQ_TS] Send message ID ",
msg_hdr.type,
+ L.INFO("[PSA_ZMQ_TS] Send message with size ", rc, "\n");
+ L.INFO("[PSA_ZMQ_TS] Send message ID ", msg_hdr.type,
" major: ", (int)msg_hdr.major,
" minor: ", (int)msg_hdr.minor,"\n");
}
} else {
- L_WARN(logHelper, "[PSA_ZMQ_TS] Error serialize message of type ",
msgSer->msgName,
+ L.WARN("[PSA_ZMQ_TS] Error serialize message of type ",
msgSer->msgName,
" for scope/topic ", scope.c_str(), "/",
topic.c_str(),"\n");
}
} else {
status = CELIX_SERVICE_EXCEPTION;
- L_WARN(logHelper, "[PSA_ZMQ_TS] Error cannot serialize message with
msg type id ", msgTypeId,
+ L.WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id
", msgTypeId,
" for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
}
return status;
}
-static void delay_first_send_for_late_joiners(log_helper_t* logHelper) {
+static void
delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper)
{
static bool firstSend = true;
if(firstSend){
- L_INFO(logHelper, "PSA_UDP_MC_TP: Delaying first send for late
joiners...\n");
+ logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late
joiners...\n");
sleep(FIRST_SEND_DELAY_IN_SECONDS);
firstSend = false;
}
diff --git
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index 6a3254e..6fd2bb8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -35,7 +35,7 @@ namespace pubsub {
std::string &_topic,
long _bndId,
int _nanoMsgSocket,
- log_helper_t* _logHelper) : scope{_scope}, topic{_topic},
bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket}, logHelper{_logHelper} {
+ celix::pubsub::nanomsg::LogHelper& _logHelper) :
scope{_scope}, topic{_topic}, bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket},
L{_logHelper} {
}
bounded_service_entry(const bounded_service_entry&) = delete;
@@ -49,13 +49,14 @@ namespace pubsub {
hash_map_t *msgTypes{};
int getCount{1};
int nanoMsgSocket{};
- log_helper_t *logHelper{};
+ celix::pubsub::nanomsg::LogHelper L;
} ;
class pubsub_nanomsg_topic_sender {
public:
- pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
log_helper_t *_logHelper, const char *_scope,
+ pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx,
+ celix::pubsub::nanomsg::LogHelper&
_logHelper, const char *_scope,
const char *_topic, long
_serializerSvcId, pubsub_serializer_service_t *_ser,
const char *_bindIp, unsigned int
_basePort, unsigned int _maxPort);
@@ -79,7 +80,7 @@ namespace pubsub {
//private:
celix_bundle_context_t *ctx;
- log_helper_t *logHelper;
+ celix::pubsub::nanomsg::LogHelper L;
long serializerSvcId;
pubsub_serializer_service_t *serializer;
diff --git a/doap/doap_Celix.rdf b/doap/doap_Celix.rdf
old mode 100755
new mode 100644