Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state",
"Whether to start afresh or resume previous flows. See the allowable value
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive
Interval", "Defines the maximum time interval between messages sent or
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection
Timeout", "Maximum time interval the client will wait for the network
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The
Quality of Service(QoS) to send the message with. Accepts three values '0', '1'
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles
that are sent successfully to the destination are transferred to this
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+ // Set the supported properties
+ std::set<core::Property> properties;
+ properties.insert(BrokerURL);
+ properties.insert(CleanSession);
+ properties.insert(ClientID);
+ properties.insert(UserName);
+ properties.insert(PassWord);
+ properties.insert(KeepLiveInterval);
+ properties.insert(ConnectionTimeOut);
+ properties.insert(QOS);
+ properties.insert(Topic);
+ setSupportedProperties(properties);
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Failure);
+ setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory) {
+ std::string value;
+ int64_t valInt;
+ value = "";
+ if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+ uri_ = value;
+ logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+ }
+ value = "";
+ if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+ clientID_ = value;
+ logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+ }
+ value = "";
+ if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+ topic_ = value;
+ logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+ }
+ value = "";
+ if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+ userName_ = value;
+ logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+ }
+ value = "";
+ if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
+ passWord_ = value;
+ logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+ }
+ value = "";
+ if (context->getProperty(CleanSession.getName(), value) &&
!value.empty() &&
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
cleanSession_)) {
+ logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]",
cleanSession_);
+ }
+ value = "";
+ if (context->getProperty(KeepLiveInterval.getName(), value) &&
!value.empty()) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ keepAliveInterval_ = valInt/1000;
+ logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]",
keepAliveInterval_);
+ }
+ }
+ value = "";
+ if (context->getProperty(ConnectionTimeOut.getName(), value) &&
!value.empty()) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ connectionTimeOut_ = valInt/1000;
+ logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]",
connectionTimeOut_);
--- End diff --
In the cases where you are using int64, please use %ll instead of %d
---