adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921170250
##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
* limitations under the License.
*/
#include "AbstractMQTTProcessor.h"
-#include <cstdio>
#include <memory>
#include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
-#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
namespace org::apache::nifi::minifi::processors {
void AbstractMQTTProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
- sslEnabled_ = false;
- sslopts_ = MQTTClient_SSLOptions_initializer;
-
- std::string value;
- int64_t valInt;
- value = "";
- if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
- uri_ = value;
- logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
- }
- value = "";
- if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
- clientID_ = value;
+ if (auto value = context->getProperty(BrokerURI)) {
+ uri_ = std::move(*value);
+ logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+ }
+ if (auto value = context->getProperty(ClientID)) {
+ clientID_ = std::move(*value);
logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
}
- value = "";
- if (context->getProperty(Topic.getName(), value) && !value.empty()) {
- topic_ = value;
+ if (auto value = context->getProperty(Topic)) {
+ topic_ = std::move(*value);
logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
}
- value = "";
- if (context->getProperty(UserName.getName(), value) && !value.empty()) {
- userName_ = value;
- logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+ if (auto value = context->getProperty(Username)) {
+ username_ = std::move(*value);
+ logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
}
- value = "";
- if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
- passWord_ = value;
- logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+ if (auto value = context->getProperty(Password)) {
+ password_ = std::move(*value);
+ logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
}
- const auto cleanSession_parsed = [&] () -> std::optional<bool> {
- std::string property_value;
- if (!context->getProperty(CleanSession.getName(), property_value)) return
std::nullopt;
- return utils::StringUtils::toBool(property_value);
- }();
- if ( cleanSession_parsed ) {
- cleanSession_ = *cleanSession_parsed;
- logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]",
cleanSession_);
+ if (const auto keep_alive_interval =
context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+ keep_alive_interval_ =
std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+ logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "]
s", int64_t{keep_alive_interval_.count()});
}
- if (auto keep_alive_interval =
context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
- keepAliveInterval_ = keep_alive_interval->getMilliseconds();
- logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "]
ms", int64_t{keepAliveInterval_.count()});
+ if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+ max_seg_size_ = {*value};
+ logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]",
max_seg_size_);
}
- if (auto connection_timeout =
context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
- connectionTimeout_ = connection_timeout->getMilliseconds();
- logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "]
ms", int64_t{connectionTimeout_.count()});
+ if (const auto connection_timeout =
context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+ connection_timeout_ =
std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+ logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "]
s", int64_t{connection_timeout_.count()});
}
- value = "";
- if (context->getProperty(QOS.getName(), value) && !value.empty() && (value
== MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
- core::Property::StringToInt(value, valInt)) {
- qos_ = valInt;
- logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+ if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value
== MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+ qos_ = {*value};
+ logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
}
- value = "";
- if (context->getProperty(SecurityProtocol.getName(), value) &&
!value.empty()) {
- if (value == MQTT_SECURITY_PROTOCOL_SSL) {
- sslEnabled_ = true;
- value = "";
- if (context->getProperty(SecurityCA.getName(), value) && !value.empty())
{
- logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
- securityCA_ = value;
- sslopts_.trustStore = securityCA_.c_str();
+ if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+ if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+ sslOpts_ = MQTTAsync_SSLOptions_initializer;
+ if (auto value = context->getProperty(SecurityCA)) {
+ logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+ securityCA_ = std::move(*value);
+ sslOpts_->trustStore = securityCA_.c_str();
}
- value = "";
- if (context->getProperty(SecurityCert.getName(), value) &&
!value.empty()) {
- logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
- securityCert_ = value;
- sslopts_.keyStore = securityCert_.c_str();
+ if (auto value = context->getProperty(SecurityCert)) {
+ logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+ securityCert_ = std::move(*value);
+ sslOpts_->keyStore = securityCert_.c_str();
}
- value = "";
- if (context->getProperty(SecurityPrivateKey.getName(), value) &&
!value.empty()) {
- logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
- securityPrivateKey_ = value;
- sslopts_.privateKey = securityPrivateKey_.c_str();
+ if (auto value = context->getProperty(SecurityPrivateKey)) {
+ logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+ securityPrivateKey_ = std::move(*value);
+ sslOpts_->privateKey = securityPrivateKey_.c_str();
}
- value = "";
- if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) &&
!value.empty()) {
- logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]",
value);
- securityPrivateKeyPassWord_ = value;
- sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+ if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+ logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]",
*value);
+ securityPrivateKeyPassword_ = std::move(*value);
+ sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
}
}
}
+
+ if (auto last_will_topic = context->getProperty(LastWillTopic);
last_will_topic.has_value() && !last_will_topic->empty()) {
+ last_will_ = MQTTAsync_willOptions_initializer;
+
+ logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]",
*last_will_topic);
+ last_will_topic_ = std::move(*last_will_topic);
+ last_will_->topicName = last_will_topic_.c_str();
+
+ if (auto value = context->getProperty(LastWillMessage)) {
+ logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]",
*value);
+ last_will_message_ = std::move(*value);
+ last_will_->message = last_will_message_.c_str();
+ }
+
+ if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value
&& (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+ logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]",
*value);
+ last_will_qos_ = {*value};
+ last_will_->qos = gsl::narrow<int>(last_will_qos_);
+ }
+
+ if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+ logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]",
*value);
+ last_will_retain_ = {*value};
+ last_will_->retained = last_will_retain_;
+ }
+ }
+
if (!client_) {
- MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+ if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+ logger_->log_error("Creating MQTT client failed");
+ }
}
if (client_) {
- MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived,
msgDelivered);
+ MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived,
nullptr);
Review Comment:
What do you mean? Checking return value of `MQTTAsync_setCallbacks`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]