Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 95e676a2b -> 83fa06e2a


MINIFICPP-393: Add security support for MQTT

This closes #259.

Signed-off-by: Bin Qiu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/83fa06e2
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/83fa06e2
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/83fa06e2

Branch: refs/heads/master
Commit: 83fa06e2a619824f46800bafde8a3745278c7074
Parents: 95e676a
Author: Bin Qiu <[email protected]>
Authored: Mon Feb 5 07:32:14 2018 -0800
Committer: Bin Qiu <[email protected]>
Committed: Thu Feb 8 15:49:21 2018 -0800

----------------------------------------------------------------------
 extensions/mqtt/AbstractMQTTProcessor.cpp | 41 ++++++++++++++++++++++++++
 extensions/mqtt/AbstractMQTTProcessor.h   | 14 +++++++++
 extensions/mqtt/ConsumeMQTT.cpp           | 17 ++++++++---
 extensions/mqtt/ConsumeMQTT.h             |  5 +++-
 4 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/AbstractMQTTProcessor.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/AbstractMQTTProcessor.cpp
index 409b69f..345c6c5 100644
--- a/extensions/mqtt/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/AbstractMQTTProcessor.cpp
@@ -41,6 +41,11 @@ core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep 
Alive Interval", "D
 core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", 
"Maximum time interval the client will wait for the network connection to the 
MQTT server", "30 sec");
 core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality 
of Service(QoS) to send the message with. Accepts three values '0', '1' and 
'2'", "MQTT_QOS_0");
 core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the 
message to", "");
+core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", 
"Protocol used to communicate with brokers", "");
+core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or 
directory path to CA certificate(s) for verifying the broker's key", "");
+core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to 
client's public key (PEM) used for authentication", "");
+core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private 
Key", "Path to client's private key (PEM) used for authentication", "");
+core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security 
Pass Phrase", "Private key passphrase", "");
 core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that 
are sent successfully to the destination are transferred to this relationship");
 core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that 
failed to send to the destination are transferred to this relationship");
 
@@ -62,6 +67,8 @@ void AbstractMQTTProcessor::initialize() {
   relationships.insert(Success);
   relationships.insert(Failure);
   setSupportedRelationships(relationships);
+  MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
+  sslEnabled_ = false;
 }
 
 void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
@@ -119,6 +126,38 @@ void 
AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
     qos_ = valInt;
     logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_);
   }
+  value = "";
+
+  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
+    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslEnabled_ = true;
+      logger_->log_debug("AbstractMQTTProcessor: ssl enable");
+      value = "";
+      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) 
{
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
+        securityCA_ = value;
+        sslopts_.trustStore = securityCA_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
+        securityCert_ = value;
+        sslopts_.keyStore = securityCert_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
+        securityPrivateKey_ = value;
+        sslopts_.privateKey = securityPrivateKey_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", 
value);
+        securityPrivateKeyPassWord_ = value;
+        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      }
+    }
+  }
   if (!client_) {
     MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, NULL);
   }
@@ -141,6 +180,8 @@ bool AbstractMQTTProcessor::reconnect() {
     conn_opts.username = userName_.c_str();
     conn_opts.password = passWord_.c_str();
   }
+  if (sslEnabled_)
+    conn_opts.ssl = &sslopts_;
   if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
     logger_->log_error("Failed to connect to MQTT broker %s", uri_);
     return false;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/AbstractMQTTProcessor.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.h 
b/extensions/mqtt/AbstractMQTTProcessor.h
index 7870c2b..7278f15 100644
--- a/extensions/mqtt/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/AbstractMQTTProcessor.h
@@ -38,6 +38,9 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define MQTT_SECURITY_PROTOCOL_SSL "ssl"
+
 // AbstractMQTTProcessor Class
 class AbstractMQTTProcessor : public core::Processor {
  public:
@@ -78,6 +81,11 @@ class AbstractMQTTProcessor : public core::Processor {
   static core::Property ConnectionTimeOut;
   static core::Property Topic;
   static core::Property QOS;
+  static core::Property SecurityProtocol;
+  static core::Property SecurityCA;
+  static core::Property SecurityCert;
+  static core::Property SecurityPrivateKey;
+  static core::Property SecurityPrivateKeyPassWord;
 
   // Supported Relationships
   static core::Relationship Failure;
@@ -141,6 +149,12 @@ class AbstractMQTTProcessor : public core::Processor {
 
  private:
   std::shared_ptr<logging::Logger> logger_;
+  MQTTClient_SSLOptions sslopts_;
+  std::string sslEnabled_;
+  std::string securityCA_;
+  std::string securityCert_;
+  std::string securityPrivateKey_;
+  std::string securityPrivateKeyPassWord_;
 };
 
 REGISTER_RESOURCE(AbstractMQTTProcessor);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/ConsumeMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.cpp b/extensions/mqtt/ConsumeMQTT.cpp
index 21cb79d..472d35f 100644
--- a/extensions/mqtt/ConsumeMQTT.cpp
+++ b/extensions/mqtt/ConsumeMQTT.cpp
@@ -35,7 +35,8 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", 
"Maximum number of messages allowed on the received MQTT queue", "");
 
 void ConsumeMQTT::initialize() {
   // Set the supported properties
@@ -49,7 +50,8 @@ void ConsumeMQTT::initialize() {
   properties.insert(ConnectionTimeOut);
   properties.insert(QOS);
   properties.insert(Topic);
-  properties.insert(MaxQueueSize);
+  properties.insert(MaxFlowSegSize);
+  properties.insert(QueueBufferMaxMessage);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -62,6 +64,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message 
*message) {
     logger_->log_debug("MQTT queue full");
     return false;
   } else {
+    if (message->payloadlen > maxSegSize_)
+      message->payloadlen = maxSegSize_;
     queue_.enqueue(message);
     logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
     return true;
@@ -73,9 +77,14 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, 
core::ProcessSession
   std::string value;
   int64_t valInt;
   value = "";
-  if (context->getProperty(MaxQueueSize.getName(), value) && !value.empty() && 
core::Property::StringToInt(value, valInt)) {
+  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: max queue size [%ll]", maxQueueSize_);
+    logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_);
+  }
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
+    maxSegSize_ = valInt;
+    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", 
maxSegSize_);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/83fa06e2/extensions/mqtt/ConsumeMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h
index f5155fb..da3ca6c 100644
--- a/extensions/mqtt/ConsumeMQTT.h
+++ b/extensions/mqtt/ConsumeMQTT.h
@@ -53,6 +53,7 @@ public:
     : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
     isSubscriber_ = true;
     maxQueueSize_ = 100;
+    maxSegsize_ = ULLONG_MAX;
   }
   // Destructor
   virtual ~ConsumeMQTT() {
@@ -64,7 +65,8 @@ public:
   // Processor Name
   static constexpr char const* ProcessorName = "ConsumeMQTT";
   // Supported Properties
-  static core::Property MaxQueueSize;
+  static core::Property MaxFlowSegSize;
+  static core::Property QueueBufferMaxMessage;
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
   public:
@@ -108,6 +110,7 @@ private:
   std::shared_ptr<logging::Logger> logger_;
   std::mutex mutex_;
   uint64_t maxQueueSize_;
+  uint64_t maxSegSize_;
   moodycamel::ConcurrentQueue<MQTTClient_message *> queue_;
 };
 

Reply via email to