Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 95e676a2b -> 83fa06e2a
MINIFICPP-393: Add security support for MQTT This closes #259. Signed-off-by: Bin Qiu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/83fa06e2 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/83fa06e2 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/83fa06e2 Branch: refs/heads/master Commit: 83fa06e2a619824f46800bafde8a3745278c7074 Parents: 95e676a Author: Bin Qiu <[email protected]> Authored: Mon Feb 5 07:32:14 2018 -0800 Committer: Bin Qiu <[email protected]> Committed: Thu Feb 8 15:49:21 2018 -0800 ---------------------------------------------------------------------- extensions/mqtt/AbstractMQTTProcessor.cpp | 41 ++++++++++++++++++++++++++ extensions/mqtt/AbstractMQTTProcessor.h | 14 +++++++++ extensions/mqtt/ConsumeMQTT.cpp | 17 ++++++++--- extensions/mqtt/ConsumeMQTT.h | 5 +++- 4 files changed, 72 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/AbstractMQTTProcessor.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/AbstractMQTTProcessor.cpp b/extensions/mqtt/AbstractMQTTProcessor.cpp index 409b69f..345c6c5 100644 --- a/extensions/mqtt/AbstractMQTTProcessor.cpp +++ b/extensions/mqtt/AbstractMQTTProcessor.cpp @@ -41,6 +41,11 @@ core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "D core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); +core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); +core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); +core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); +core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); @@ -62,6 +67,8 @@ void AbstractMQTTProcessor::initialize() { relationships.insert(Success); relationships.insert(Failure); setSupportedRelationships(relationships); + MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer; + sslEnabled_ = false; } void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { + if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; + logger_->log_debug("AbstractMQTTProcessor: ssl enable"); + value = ""; + if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value); + securityCA_ = value; + sslopts_.trustStore = securityCA_.c_str(); + } + value = ""; + if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value); + securityCert_ = value; + sslopts_.keyStore = securityCert_.c_str(); + } + value = ""; + if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value); + securityPrivateKey_ = value; + sslopts_.privateKey = securityPrivateKey_.c_str(); + } + value = ""; + if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value); + securityPrivateKeyPassWord_ = value; + sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str(); + } + } + } if (!client_) { MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); } @@ -141,6 +180,8 @@ bool AbstractMQTTProcessor::reconnect() { conn_opts.username = userName_.c_str(); conn_opts.password = passWord_.c_str(); } + if (sslEnabled_) + conn_opts.ssl = &sslopts_; if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { logger_->log_error("Failed to connect to MQTT broker %s", uri_); return false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/AbstractMQTTProcessor.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/AbstractMQTTProcessor.h b/extensions/mqtt/AbstractMQTTProcessor.h index 7870c2b..7278f15 100644 --- a/extensions/mqtt/AbstractMQTTProcessor.h +++ b/extensions/mqtt/AbstractMQTTProcessor.h @@ -38,6 +38,9 @@ namespace nifi { namespace minifi { namespace processors { +#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext" +#define MQTT_SECURITY_PROTOCOL_SSL "ssl" + // AbstractMQTTProcessor Class class AbstractMQTTProcessor : public core::Processor { public: @@ -78,6 +81,11 @@ class AbstractMQTTProcessor : public core::Processor { static core::Property ConnectionTimeOut; static core::Property Topic; static core::Property QOS; + static core::Property SecurityProtocol; + static core::Property SecurityCA; + static core::Property SecurityCert; + static core::Property SecurityPrivateKey; + static core::Property SecurityPrivateKeyPassWord; // Supported Relationships static core::Relationship Failure; @@ -141,6 +149,12 @@ class AbstractMQTTProcessor : public core::Processor { private: std::shared_ptr<logging::Logger> logger_; + MQTTClient_SSLOptions sslopts_; + std::string sslEnabled_; + std::string securityCA_; + std::string securityCert_; + std::string securityPrivateKey_; + std::string securityPrivateKeyPassWord_; }; REGISTER_RESOURCE(AbstractMQTTProcessor); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/ConsumeMQTT.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/ConsumeMQTT.cpp b/extensions/mqtt/ConsumeMQTT.cpp index 21cb79d..472d35f 100644 --- a/extensions/mqtt/ConsumeMQTT.cpp +++ b/extensions/mqtt/ConsumeMQTT.cpp @@ -35,7 +35,8 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", ""); void ConsumeMQTT::initialize() { // Set the supported properties @@ -49,7 +50,8 @@ void ConsumeMQTT::initialize() { properties.insert(ConnectionTimeOut); properties.insert(QOS); properties.insert(Topic); - properties.insert(MaxQueueSize); + properties.insert(MaxFlowSegSize); + properties.insert(QueueBufferMaxMessage); setSupportedProperties(properties); // Set the supported relationships std::set<core::Relationship> relationships; @@ -62,6 +64,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { logger_->log_debug("MQTT queue full"); return false; } else { + if (message->payloadlen > maxSegSize_) + message->payloadlen = maxSegSize_; queue_.enqueue(message); logger_->log_debug("enqueue MQTT message length %d", message->payloadlen); return true; @@ -73,9 +77,14 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession std::string value; int64_t valInt; value = ""; - if (context->getProperty(MaxQueueSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { maxQueueSize_ = valInt; - logger_->log_debug("ConsumeMQTT: max queue size [%ll]", maxQueueSize_); + logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_); + } + value = ""; + if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { + maxSegSize_ = valInt; + logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", maxSegSize_); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/ConsumeMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h index f5155fb..da3ca6c 100644 --- a/extensions/mqtt/ConsumeMQTT.h +++ b/extensions/mqtt/ConsumeMQTT.h @@ -53,6 +53,7 @@ public: : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) { isSubscriber_ = true; maxQueueSize_ = 100; + maxSegsize_ = ULLONG_MAX; } // Destructor virtual ~ConsumeMQTT() { @@ -64,7 +65,8 @@ public: // Processor Name static constexpr char const* ProcessorName = "ConsumeMQTT"; // Supported Properties - static core::Property MaxQueueSize; + static core::Property MaxFlowSegSize; + static core::Property QueueBufferMaxMessage; // Nest Callback Class for write stream class WriteCallback: public OutputStreamCallback { public: @@ -108,6 +110,7 @@ private: std::shared_ptr<logging::Logger> logger_; std::mutex mutex_; uint64_t maxQueueSize_; + uint64_t maxSegSize_; moodycamel::ConcurrentQueue<MQTTClient_message *> queue_; };
