MINIFICPP-394: Implement MQTT C2 protocol along with MQTT/REST
translation layer and resolve osx build issues.

This closes #321.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/8dd7e91f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/8dd7e91f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/8dd7e91f

Branch: refs/heads/master
Commit: 8dd7e91f9ae2358cc22b839eeee006f0025b4783
Parents: 30df9a5
Author: Marc Parisi <[email protected]>
Authored: Tue May 1 14:29:02 2018 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Tue May 8 14:03:13 2018 -0400

----------------------------------------------------------------------
 C2.md                                           | 165 ++++++++-
 PROCESSORS.md                                   |  57 ++++
 README.md                                       |  12 +-
 bootstrap.sh                                    |   2 +
 extensions/mqtt/AbstractMQTTProcessor.cpp       | 199 -----------
 extensions/mqtt/AbstractMQTTProcessor.h         | 168 ---------
 extensions/mqtt/CMakeLists.txt                  |  12 +-
 extensions/mqtt/ConsumeMQTT.cpp                 | 119 -------
 extensions/mqtt/ConsumeMQTT.h                   | 125 -------
 extensions/mqtt/MQTTLoader.h                    |  31 +-
 extensions/mqtt/PublishMQTT.cpp                 | 106 ------
 extensions/mqtt/PublishMQTT.h                   | 142 --------
 .../controllerservice/MQTTControllerService.cpp | 100 ++++++
 .../controllerservice/MQTTControllerService.h   | 342 +++++++++++++++++++
 .../mqtt/processors/AbstractMQTTProcessor.cpp   | 199 +++++++++++
 .../mqtt/processors/AbstractMQTTProcessor.h     | 168 +++++++++
 extensions/mqtt/processors/ConsumeMQTT.cpp      | 119 +++++++
 extensions/mqtt/processors/ConsumeMQTT.h        | 125 +++++++
 extensions/mqtt/processors/ConvertBase.cpp      |  69 ++++
 extensions/mqtt/processors/ConvertBase.h        |  90 +++++
 extensions/mqtt/processors/ConvertHeartBeat.cpp |  75 ++++
 extensions/mqtt/processors/ConvertHeartBeat.h   |  78 +++++
 extensions/mqtt/processors/ConvertJSONAck.cpp   | 115 +++++++
 extensions/mqtt/processors/ConvertJSONAck.h     | 105 ++++++
 extensions/mqtt/processors/ConvertUpdate.cpp    | 106 ++++++
 extensions/mqtt/processors/ConvertUpdate.h      |  91 +++++
 extensions/mqtt/processors/PublishMQTT.cpp      | 106 ++++++
 extensions/mqtt/processors/PublishMQTT.h        | 142 ++++++++
 extensions/mqtt/protocol/MQTTC2Protocol.cpp     | 103 ++++++
 extensions/mqtt/protocol/MQTTC2Protocol.h       |  96 ++++++
 extensions/mqtt/protocol/PayloadSerializer.cpp  |  38 +++
 extensions/mqtt/protocol/PayloadSerializer.h    | 318 +++++++++++++++++
 .../rocksdb-repos/DatabaseContentRepository.h   |   4 +-
 libminifi/include/c2/C2Payload.h                |   5 +
 .../include/core/controller/ControllerService.h |   5 +
 .../StandardControllerServiceProvider.h         |   2 +-
 libminifi/include/properties/Configure.h        |  14 +
 libminifi/include/utils/ByteArrayCallback.h     |  16 +-
 libminifi/src/FlowController.cpp                |  16 +-
 libminifi/src/c2/C2Agent.cpp                    |   1 +
 libminifi/src/c2/C2Payload.cpp                  |   7 +
 libminifi/src/core/ProcessSession.cpp           |   2 +-
 .../StandardControllerServiceNode.cpp           |   6 +
 libminifi/src/utils/ByteArrayCallback.cpp       |  15 +
 44 files changed, 2926 insertions(+), 890 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/C2.md
----------------------------------------------------------------------
diff --git a/C2.md b/C2.md
index 194467c..307cd59 100644
--- a/C2.md
+++ b/C2.md
@@ -44,14 +44,16 @@ 
https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
 
     in minifi.properties
 
-    # Disable/Enable C2
-    nifi.c2.enable=true
+       # Disable/Enable C2
+       nifi.c2.enable=true
 
        # specify classes for the AST response
        nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
        
        # specify C2 protocol -- default is RESTSender if this is not specified
        c2.agent.protocol.class=RESTSender
+       # may also use MQTT
+       # c2.agent.protocol.class=MQTTC2Protocol
        
        # control c2 heartbeat interval in millisecocnds
        c2.agent.heartbeat.period=3000
@@ -71,7 +73,7 @@ 
https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal
        
        # configure SSL Context service for REST Protocol
        c2.rest.ssl.context.service
-       
+
 
 ### Metrics
 
@@ -127,3 +129,160 @@ configuration produces the following JSON:
         }
     }
     
+
+### Protocols
+
+The default protocol is a RESTFul service; however, there is an MQTT protocol 
with a translation to use the 
+RESTFul C2 server. This is useful for cases where an MQTT C2 server isn't 
available, or enclave partioning
+requires a single ingress/egress through a gateway. In these classes of 
devices, MQTT can be used as the intermediate
+or RESTSender can be used for C2 operations.
+
+As defined, above, MQTTC2Protocol can be used for the agent protocol class. If 
you wish to communicate with a RESTFul C2 server,
+you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and 
ConvertUpdate classes on an agent to perform the transation.
+State is not kept with an intermediate agent other than the broker. The broker 
is not embedded with the agent to simplify the agent.
+
+An example configuration, below, defines an agent that receives and forward 
MQTT C2 requests to a C2 server. Additionally, this agent
+will forward responses and updates to the heartbeating agents. 
+
+       MiNiFi Config Version: 3
+       Flow Controller:
+         name: GetFile
+       Core Properties:
+         flow controller graceful shutdown period: 10 sec
+         flow service write delay interval: 500 ms
+         administrative yield duration: 30 sec
+         bored yield duration: 10 millis
+         max concurrent threads: 1
+         variable registry properties: ''
+       FlowFile Repository:
+         partitions: 256
+         checkpoint interval: 2 mins
+         always sync: false
+         Swap:
+           threshold: 20000
+           in period: 5 sec
+           in threads: 1
+           out period: 5 sec
+           out threads: 4
+       Content Repository:
+         content claim max appendable size: 10 MB
+         content claim max flow files: 100
+         always sync: false
+       Provenance Repository:
+         provenance rollover time: 1 min
+       Component Status Repository:
+         buffer size: 1440
+         snapshot frequency: 1 min
+       Security Properties:
+         keystore: ''
+         keystore type: ''
+         keystore password: ''
+         key password: ''
+         truststore: ''
+         truststore type: ''
+         truststore password: ''
+         ssl protocol: ''
+         Sensitive Props:
+           key:
+           algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+           provider: BC
+       Processors:
+       - id: 24493a28-015a-1000-0000-000000000000
+         name: convert
+         class: org.apache.nifi.processors.standard.ConvertHeartBeat
+         max concurrent tasks: 1
+         scheduling strategy: TIMER_DRIVEN
+         scheduling period: 10 msec
+         penalization period: 30 sec
+         yield period: 2 sec
+         run duration nanos: 10 msec
+         auto-terminated relationships list:
+         Properties:
+           MQTT Controller Service: mqttservice
+           Listening Topic: heartbeats
+       - id: 24493a28-015a-1000-0000-000000000006
+         name: convertJSON
+         class: org.apache.nifi.processors.standard.ConvertJSONAck
+         max concurrent tasks: 1
+         scheduling strategy: TIMER_DRIVEN
+         scheduling period: 10 msec
+         penalization period: 30 sec
+         yield period: 1 sec
+         run duration nanos: 10 msec
+         auto-terminated relationships list:
+         - success
+         Properties:
+           MQTT Controller Service: mqttservice
+       - id: 24493a28-015a-1000-0000-000000000007
+         name: convertupdate
+         class: org.apache.nifi.processors.standard.ConvertUpdate
+         max concurrent tasks: 1
+         scheduling strategy: TIMER_DRIVEN
+         scheduling period: 10 msec
+         penalization period: 30 sec
+         yield period: 1 sec
+         run duration nanos: 10 msec
+         auto-terminated relationships list:
+         - success
+         Properties:
+           MQTT Controller Service: mqttservice
+           Listening Topic: updates
+       - id: 24493a28-015a-1000-0000-000000000021
+         name: httpheartbeat
+         class: org.apache.nifi.processors.standard.InvokeHTTP
+         max concurrent tasks: 1
+         scheduling strategy: TIMER_DRIVEN
+         scheduling period: 10 msec
+         penalization period: 30 sec
+         yield period: 1 sec
+         run duration nanos: 10 msec
+         auto-terminated relationships list:
+         Properties:
+           HTTP Method: POST
+           Remote URL: 
http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat
+           Content-type: application/json
+       - id: 24493a28-015a-1000-0000-000000000022
+         name: log
+         class: org.apache.nifi.processors.standard.LogAttribute
+         max concurrent tasks: 1
+         scheduling strategy: TIMER_DRIVEN
+         scheduling period: 100 msec
+         penalization period: 30 sec
+         yield period: 1 sec
+         run duration nanos: 1 msec
+         auto-terminated relationships list:
+         - success
+         Properties:
+       Controller Services:
+       - id: 94491a38-015a-1000-0000-000000000001
+         name: mqttservice
+         class: MQTTContextService
+         Properties:
+           Broker URI: localhost:1883
+           Client ID: hiblajl
+           Quality of Service: 2
+       Process Groups: []
+       Input Ports: []
+       Output Ports: []
+       Funnels: []
+       Connections:
+       - id: 1d09c015-015d-1000-0000-000000000000
+         name: convert/success/httpheartbeat
+         source id: 24493a28-015a-1000-0000-000000000000
+         source relationship name: success
+         destination id: 24493a28-015a-1000-0000-000000000021
+         max work queue size: 10000
+         max work queue data size: 1 GB
+         flowfile expiration: 0 sec
+         queue prioritizer class: 
org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+       - id: 1d09c015-015d-1000-0000-000000000002
+         name: httpheartbeat/success/convertJSON
+         source id: 24493a28-015a-1000-0000-000000000021
+         source relationship name: success
+         destination id: 24493a28-015a-1000-0000-000000000006
+         max work queue size: 10000
+         max work queue data size: 1 GB
+         flowfile expiration: 0 sec
+         queue prioritizer class: 
org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+       Remote Process Groups: []
+       NiFi Properties Overrides: {}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index e57c16e..4588236 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -21,6 +21,9 @@
 - [ApplyTemplate](#applytemplate)
 - [CompressContent](#compresscontent)
 - [ConsumeMQTT](#consumemqtt)
+- [ConvertHeartBeat](#convertheartbeat)
+- [ConvertJSONAck](#convertjsonack)
+- [ConvertUpdate](#convertupdate)
 - [ExecuteProcess](#executeprocess)
 - [ExecuteScript](#executescript)
 - [ExecuteSQL](#executesql)
@@ -96,6 +99,60 @@ default values, and whether a property supports the NiFi 
Expression Language.
 | - | - |
 | success | All FlowFiles are routed to this relationship. |
 
+## ConvertHeartBeat
+
+This Processor converts MQTT heartbeats into a JSON repreesntation.  
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| **MQTT Controller Service** | | | The MQTT Controller service |
+| Listening topic | | | The topic on which we will listen to get MQTT C2 
messages |
+
+
+## ConvertJSONAck
+
+This Processor parses C2 respones (acknowledgements) and forwards them to the 
MQTT agent.    
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| **MQTT Controller Service** | | | The MQTT Controller service |
+| Listening topic | | | The topic on which we will listen to get MQTT C2 
messages |
+
+### Relationships
+
+| Name | Description |
+| - | - |
+| success | Any successful http response flow file will be sent to this 
relationship |
+
+## ConvertUpdate
+
+This converts MQTT update messages into an HTTP request to retrieve an update. 
This
+processor requires cURL support. If it does not exist this processor will be a 
NOOP.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| **MQTT Controller Service** | | | The MQTT Controller service |
+| SSL Context Service | | | SSL context service used for HTTP requestor.  |
+| Listening topic | | | The topic on which we will listen to get MQTT C2 
messages |
+
 ## ExecuteProcess
 
 ### Description

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 3f85f82..32eb897 100644
--- a/README.md
+++ b/README.md
@@ -702,7 +702,17 @@ Additionally, a unique hexadecimal 
uid.minifi.device.segment should be assigned
           Trigger Threshold: 90
           Low Battery Threshold: 50
           Wait Period: 500 ms
-
+### MQTT Controller service
+The MQTTController Service can be configured for MQTT connectivity and provide 
that capability to your processors when MQTT is built.
+    
+    Controller Services:
+    - name: mqttservice
+      id: 294491a38-015a-1000-0000-000000000001
+      class: MQTTContextService
+      Properties:
+          Broker URI: localhost:1883
+           Client ID: client ID
+          Quality of Service: 2
 ### Running
 After completing a [build](#building), the application can be run by issuing 
the following from :
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/bootstrap.sh
----------------------------------------------------------------------
diff --git a/bootstrap.sh b/bootstrap.sh
index 7706a58..1ea5646 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -185,6 +185,8 @@ elif [[ "$OS" = Deb* ]]; then
   . "${script_directory}/debian.sh"
 elif [[ "$OS" = Rasp* ]]; then
   . "${script_directory}/aptitude.sh"
+elif [[ "$OS" = Pop* ]]; then
+  . "${script_directory}/aptitude.sh"
 elif [[ "$OS" = Ubuntu* ]]; then
   . "${script_directory}/aptitude.sh"
 elif [[ "$OS" = *SUSE* ]]; then

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/AbstractMQTTProcessor.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/AbstractMQTTProcessor.cpp
deleted file mode 100644
index 345c6c5..0000000
--- a/extensions/mqtt/AbstractMQTTProcessor.cpp
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * @file AbstractMQTTProcessor.cpp
- * AbstractMQTTProcessor class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "AbstractMQTTProcessor.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use 
to connect to the MQTT broker", "");
-core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether 
to start afresh or resume previous flows. See the allowable value descriptions 
for more details", "true");
-core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to 
use", "");
-core::Property AbstractMQTTProcessor::UserName("Username", "Username to use 
when connecting to the broker", "");
-core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use 
when connecting to the broker", "");
-core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", 
"Defines the maximum time interval between messages sent or received", "60 
sec");
-core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", 
"Maximum time interval the client will wait for the network connection to the 
MQTT server", "30 sec");
-core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality 
of Service(QoS) to send the message with. Accepts three values '0', '1' and 
'2'", "MQTT_QOS_0");
-core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the 
message to", "");
-core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", 
"Protocol used to communicate with brokers", "");
-core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or 
directory path to CA certificate(s) for verifying the broker's key", "");
-core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to 
client's public key (PEM) used for authentication", "");
-core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private 
Key", "Path to client's private key (PEM) used for authentication", "");
-core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security 
Pass Phrase", "Private key passphrase", "");
-core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that 
are sent successfully to the destination are transferred to this relationship");
-core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that 
failed to send to the destination are transferred to this relationship");
-
-void AbstractMQTTProcessor::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(relationships);
-  MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
-  sslEnabled_ = false;
-}
-
-void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
-  }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
-  }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
-  }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
-  }
-  value = "";
-  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
-      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
cleanSession_)) {
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", 
cleanSession_);
-  }
-  value = "";
-  if (context->getProperty(KeepLiveInterval.getName(), value) && 
!value.empty()) {
-    core::TimeUnit unit;
-    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
-      keepAliveInterval_ = valInt/1000;
-      logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%ll]", 
keepAliveInterval_);
-    }
-  }
-  value = "";
-  if (context->getProperty(ConnectionTimeOut.getName(), value) && 
!value.empty()) {
-    core::TimeUnit unit;
-    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
-      connectionTimeOut_ = valInt/1000;
-      logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", 
connectionTimeOut_);
-    }
-  }
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value 
== MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_);
-  }
-  value = "";
-
-  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      logger_->log_debug("AbstractMQTTProcessor: ssl enable");
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) 
{
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
-      }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
-      }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
-      }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", 
value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
-      }
-    }
-  }
-  if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, NULL);
-  }
-  if (client_) {
-    MQTTClient_setCallbacks(client_, (void *) this, connectionLost, 
msgReceived, msgDelivered);
-    // call reconnect to bootstrap
-    this->reconnect();
-  }
-}
-
-bool AbstractMQTTProcessor::reconnect() {
-  if (!client_)
-    return false;
-  if (MQTTClient_isConnected(client_))
-    return true;
-  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-  conn_opts.keepAliveInterval = keepAliveInterval_;
-  conn_opts.cleansession = cleanSession_;
-  if (!userName_.empty()) {
-    conn_opts.username = userName_.c_str();
-    conn_opts.password = passWord_.c_str();
-  }
-  if (sslEnabled_)
-    conn_opts.ssl = &sslopts_;
-  if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
-    logger_->log_error("Failed to connect to MQTT broker %s", uri_);
-    return false;
-  }
-  if (isSubscriber_) {
-    MQTTClient_subscribe(client_, topic_.c_str(), qos_);
-  }
-  return true;
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/AbstractMQTTProcessor.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.h 
b/extensions/mqtt/AbstractMQTTProcessor.h
deleted file mode 100644
index aab0ef5..0000000
--- a/extensions/mqtt/AbstractMQTTProcessor.h
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * @file AbstractMQTTProcessor.h
- * AbstractMQTTProcessor class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __ABSTRACTMQTT_H__
-#define __ABSTRACTMQTT_H__
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-
-#define MQTT_QOS_0 "0"
-#define MQTT_QOS_1 "1"
-#define MQTT_QOS_2 "2"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define MQTT_SECURITY_PROTOCOL_SSL "ssl"
-
-// AbstractMQTTProcessor Class
-class AbstractMQTTProcessor : public core::Processor {
- public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
-      : core::Processor(name, uuid),
-        logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) {
-    client_ = nullptr;
-    cleanSession_ = false;
-    keepAliveInterval_ = 60;
-    connectionTimeOut_ = 30;
-    qos_ = 0;
-    isSubscriber_ = false;
-  }
-  // Destructor
-  virtual ~AbstractMQTTProcessor() {
-    if (isSubscriber_) {
-      MQTTClient_unsubscribe(client_, topic_.c_str());
-    }
-    if (client_ && MQTTClient_isConnected(client_)) {
-      MQTTClient_disconnect(client_, connectionTimeOut_);
-    }
-    if (client_)
-      MQTTClient_destroy(&client_);
-  }
-  // Processor Name
-  static constexpr char const* ProcessorName = "AbstractMQTTProcessor";
-  // Supported Properties
-  static core::Property BrokerURL;
-  static core::Property ClientID;
-  static core::Property UserName;
-  static core::Property PassWord;
-  static core::Property CleanSession;
-  static core::Property KeepLiveInterval;
-  static core::Property ConnectionTimeOut;
-  static core::Property Topic;
-  static core::Property QOS;
-  static core::Property SecurityProtocol;
-  static core::Property SecurityCA;
-  static core::Property SecurityCert;
-  static core::Property SecurityPrivateKey;
-  static core::Property SecurityPrivateKeyPassWord;
-
-  // Supported Relationships
-  static core::Relationship Failure;
-  static core::Relationship Success;
-
- public:
-  /**
-   * Function that's executed when the processor is scheduled.
-   * @param context process context.
-   * @param sessionFactory process session factory that is used when creating
-   * ProcessSession objects.
-   */
-  virtual void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
-  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  }
-  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
-  }
-  // Initialize, over write by NiFi AbstractMQTTProcessor
-  virtual void initialize(void);
-  // MQTT async callbacks
-  static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
-    processor->delivered_token_ = dt;
-  }
-  static int msgReceived(void *context, char *topicName, int topicLen, 
MQTTClient_message *message) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
-    if (processor->isSubscriber_) {
-      if (!processor->enqueueReceiveMQTTMsg(message))
-        MQTTClient_freeMessage(&message);
-    } else {
-      MQTTClient_freeMessage(&message);
-    }
-    MQTTClient_free(topicName);
-    return 1;
-  }
-  static void connectionLost(void *context, char *cause) {
-    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
-    processor->reconnect();
-  }
-  bool reconnect();
-  // enqueue receive MQTT message
-  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message) {
-    return false;
-  }
-
- protected:
-  MQTTClient client_;
-  MQTTClient_deliveryToken delivered_token_;
-  std::string uri_;
-  std::string topic_;
-  int64_t keepAliveInterval_;
-  int64_t connectionTimeOut_;
-  int64_t qos_;
-  bool cleanSession_;
-  std::string clientID_;
-  std::string userName_;
-  std::string passWord_;
-  bool isSubscriber_;
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  MQTTClient_SSLOptions sslopts_;
-  bool sslEnabled_;
-  std::string securityCA_;
-  std::string securityCert_;
-  std::string securityPrivateKey_;
-  std::string securityPrivateKeyPassWord_;
-};
-
-REGISTER_RESOURCE(AbstractMQTTProcessor);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/mqtt/CMakeLists.txt b/extensions/mqtt/CMakeLists.txt
index d248524..3374557 100644
--- a/extensions/mqtt/CMakeLists.txt
+++ b/extensions/mqtt/CMakeLists.txt
@@ -20,11 +20,11 @@
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/)
+include_directories(./controllerservice ./processors ./protocol 
../../libminifi/include  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/)
 
 include_directories(../../thirdparty/paho.mqtt.c/src)
 
-file(GLOB SOURCES  "*.cpp")
+file(GLOB SOURCES "*.cpp" "protocol/*.cpp" "processors/*.cpp" 
"controllerservice/*.cpp")
 
 add_library(minifi-mqtt-extensions STATIC ${SOURCES})
 set_property(TARGET minifi-mqtt-extensions PROPERTY POSITION_INDEPENDENT_CODE 
ON)
@@ -37,10 +37,6 @@ endif()
 
 
 # Include UUID
-find_package(UUID REQUIRED)
-target_link_libraries(minifi-mqtt-extensions ${LIBMINIFI} ${UUID_LIBRARIES} 
${JSONCPP_LIB})
-find_package(OpenSSL REQUIRED)
-include_directories(${OPENSSL_INCLUDE_DIR})
 target_link_libraries(minifi-mqtt-extensions ${CMAKE_DL_LIBS} )
 if (MQTT_FOUND AND NOT BUILD_MQTT)
        target_link_libraries(minifi-mqtt-extensions ${MQTT_LIBRARIES} )
@@ -55,7 +51,7 @@ include_directories(${ZLIB_INCLUDE_DIRS})
 target_link_libraries (minifi-mqtt-extensions ${ZLIB_LIBRARIES})
 if (WIN32)
     set_target_properties(minifi-mqtt-extensions PROPERTIES
-        LINK_FLAGS "/WHOLEMQTT"
+        LINK_FLAGS "/WHOLEARCHIVE"
     )
 elseif (APPLE)
     set_target_properties(minifi-mqtt-extensions PROPERTIES
@@ -63,7 +59,7 @@ elseif (APPLE)
     )
 else ()
     set_target_properties(minifi-mqtt-extensions PROPERTIES
-        LINK_FLAGS "-Wl,--whole-mqtt"
+        LINK_FLAGS "-Wl,--whole-archive"
     )
 endif ()
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/ConsumeMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.cpp b/extensions/mqtt/ConsumeMQTT.cpp
deleted file mode 100644
index 472d35f..0000000
--- a/extensions/mqtt/ConsumeMQTT.cpp
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * @file ConsumeMQTT.cpp
- * ConsumeMQTT class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeMQTT.h"
-#include <stdio.h>
-#include <algorithm>
-#include <memory>
-#include <string>
-#include <map>
-#include <set>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
-core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", 
"Maximum number of messages allowed on the received MQTT queue", "");
-
-void ConsumeMQTT::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
-  properties.insert(MaxFlowSegSize);
-  properties.insert(QueueBufferMaxMessage);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
-}
-
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_debug("MQTT queue full");
-    return false;
-  } else {
-    if (message->payloadlen > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
-  }
-}
-
-void ConsumeMQTT::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
-  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxQueueSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_);
-  }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", 
maxSegSize_);
-  }
-}
-
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if necessary
-  reconnect();
-  std::deque<MQTTClient_message *> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
-  while (!msg_queue.empty()) {
-    MQTTClient_message *message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    ConsumeMQTT::WriteCallback callback(message);
-    session->write(processFlowFile, &callback);
-    if (callback.status_ < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
-    } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, 
uri_.c_str());
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, 
topic_.c_str());
-      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
-    }
-    MQTTClient_freeMessage(&message);
-    msg_queue.pop_front();
-  }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/ConsumeMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h
deleted file mode 100644
index 0b26d42..0000000
--- a/extensions/mqtt/ConsumeMQTT.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * @file ConsumeMQTT.h
- * ConsumeMQTT class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __CONSUME_MQTT_H__
-#define __CONSUME_MQTT_H__
-
-#include <limits>
-#include <deque>
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/Property.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "concurrentqueue.h"
-#include "MQTTClient.h"
-#include "AbstractMQTTProcessor.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
-#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
-
-// ConsumeMQTT Class
-class ConsumeMQTT: public processors::AbstractMQTTProcessor {
-public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
-    : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) {
-    isSubscriber_ = true;
-    maxQueueSize_ = 100;
-    maxSegSize_ = ULLONG_MAX;
-  }
-  // Destructor
-  virtual ~ConsumeMQTT() {
-    MQTTClient_message *message;
-    while (queue_.try_dequeue(message)) {
-      MQTTClient_freeMessage(&message);
-    }
-  }
-  // Processor Name
-  static constexpr char const* ProcessorName = "ConsumeMQTT";
-  // Supported Properties
-  static core::Property MaxFlowSegSize;
-  static core::Property QueueBufferMaxMessage;
-  // Nest Callback Class for write stream
-  class WriteCallback: public OutputStreamCallback {
-  public:
-    WriteCallback(MQTTClient_message *message) :
-      message_(message) {
-      status_ = 0;
-    }
-    MQTTClient_message *message_;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(message_->payload), 
message_->payloadlen);
-      if (len < 0)
-        status_ = -1;
-      return len;
-    }
-    int status_;
-  };
-
-public:
-  /**
-   * Function that's executed when the processor is scheduled.
-   * @param context process context.
-   * @param sessionFactory process session factory that is used when creating
-   * ProcessSession objects.
-   */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
-  // OnTrigger method, implemented by NiFi ConsumeMQTT
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
-  // Initialize, over write by NiFi ConsumeMQTT
-  virtual void initialize(void);
-  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
-
-protected:
-  void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
-    MQTTClient_message *message;
-    while (queue_.try_dequeue(message)) {
-      msg_queue.push_back(message);
-    }
-  }
-
-private:
-  std::shared_ptr<logging::Logger> logger_;
-  std::mutex mutex_;
-  uint64_t maxQueueSize_;
-  uint64_t maxSegSize_;
-  moodycamel::ConcurrentQueue<MQTTClient_message *> queue_;
-};
-
-REGISTER_RESOURCE (ConsumeMQTT);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/MQTTLoader.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/MQTTLoader.h b/extensions/mqtt/MQTTLoader.h
index d337af5..d8d3e8f 100644
--- a/extensions/mqtt/MQTTLoader.h
+++ b/extensions/mqtt/MQTTLoader.h
@@ -18,10 +18,14 @@
 #ifndef EXTENSION_MQTTLOADER_H
 #define EXTENSION_MQTTLOADER_H
 
-#include "PublishMQTT.h"
-#include "ConsumeMQTT.h"
+#include "controllerservice/MQTTControllerService.h"
+#include "processors/PublishMQTT.h"
+#include "processors/ConsumeMQTT.h"
+#include "MQTTC2Protocol.h"
 #include "core/ClassLoader.h"
-
+#include "ConvertHeartBeat.h"
+#include "ConvertJSONAck.h"
+#include "ConvertUpdate.h"
 class __attribute__((visibility("default"))) MQTTFactory : public 
core::ObjectFactory {
  public:
   MQTTFactory() {
@@ -47,17 +51,30 @@ class __attribute__((visibility("default"))) MQTTFactory : 
public core::ObjectFa
     std::vector<std::string> class_names;
     class_names.push_back("PublishMQTT");
     class_names.push_back("ConsumeMQTT");
+    class_names.push_back("MQTTContextService");
+    class_names.push_back("MQTTC2Protocol");
+    class_names.push_back("ConvertHeartBeat");
+    class_names.push_back("ConvertJSONAck");
+    class_names.push_back("ConvertUpdate");
     return class_names;
   }
 
   virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) 
{
     if (utils::StringUtils::equalsIgnoreCase(class_name, "PublishMQTT")) {
       return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::PublishMQTT>());
-    }
-    else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConsumeMQTT")) {
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"ConsumeMQTT")) {
       return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::ConsumeMQTT>());
-    }
-    else {
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"MQTTContextService")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::controllers::MQTTControllerService>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"MQTTC2Protocol")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::c2::MQTTC2Protocol>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"ConvertHeartBeat")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::ConvertHeartBeat>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"ConvertJSONAck")) {
+      return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::ConvertJSONAck>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, 
"ConvertUpdate")) {
+          return std::unique_ptr<ObjectFactory>(new 
core::DefautObjectFactory<minifi::processors::ConvertUpdate>());
+    } else {
       return nullptr;
     }
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/PublishMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/PublishMQTT.cpp b/extensions/mqtt/PublishMQTT.cpp
deleted file mode 100644
index 411cc2d..0000000
--- a/extensions/mqtt/PublishMQTT.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * @file PublishMQTT.cpp
- * PublishMQTT class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "PublishMQTT.h"
-#include <stdio.h>
-#include <algorithm>
-#include <memory>
-#include <string>
-#include <map>
-#include <set>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in 
broker", "false");
-core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
-
-void PublishMQTT::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(BrokerURL);
-  properties.insert(CleanSession);
-  properties.insert(ClientID);
-  properties.insert(UserName);
-  properties.insert(PassWord);
-  properties.insert(KeepLiveInterval);
-  properties.insert(ConnectionTimeOut);
-  properties.insert(QOS);
-  properties.insert(Topic);
-  properties.insert(Retain);
-  properties.insert(MaxFlowSegSize);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Failure);
-  setSupportedRelationships(relationships);
-}
-
-void PublishMQTT::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
-  AbstractMQTTProcessor::onSchedule(context, sessionFactory);
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    max_seg_size_ = valInt;
-    logger_->log_debug("PublishMQTT: max flow segment size [%ll]", 
max_seg_size_);
-  }
-  value = "";
-  if (context->getProperty(Retain.getName(), value) && !value.empty() && 
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, retain_)) {
-    logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
-  }
-}
-
-void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
-
-  if (!flowFile) {
-    return;
-  }
-
-  if (!reconnect()) {
-    logger_->log_error("MQTT connect to %s failed", uri_);
-    session->transfer(flowFile, Failure);
-    return;
-  }
-
-  PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, 
topic_, client_, qos_, retain_, delivered_token_);
-  session->read(flowFile, &callback);
-  if (callback.status_ < 0) {
-    logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-    session->transfer(flowFile, Failure);
-  } else {
-    logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-    session->transfer(flowFile, Success);
-  }
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/PublishMQTT.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/PublishMQTT.h b/extensions/mqtt/PublishMQTT.h
deleted file mode 100644
index ed17bd4..0000000
--- a/extensions/mqtt/PublishMQTT.h
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * @file PublishMQTT.h
- * PublishMQTT class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PUBLISH_MQTT_H__
-#define __PUBLISH_MQTT_H__
-
-#include "FlowFileRecord.h"
-#include "core/Processor.h"
-#include "core/ProcessSession.h"
-#include "core/Core.h"
-#include "core/Resource.h"
-#include "core/Property.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "MQTTClient.h"
-#include "AbstractMQTTProcessor.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-// PublishMQTT Class
-class PublishMQTT: public processors::AbstractMQTTProcessor {
-public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
-    : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
-    retain_ = false;
-    max_seg_size_ = ULLONG_MAX;
-  }
-  // Destructor
-  virtual ~PublishMQTT() {
-  }
-  // Processor Name
-  static constexpr char const* ProcessorName = "PublishMQTT";
-  // Supported Properties
-  static core::Property Retain;
-  static core::Property MaxFlowSegSize;
-
-  // Nest Callback Class for read stream
-  class ReadCallback: public InputStreamCallback {
-  public:
-    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string 
&key, MQTTClient client,
-        int qos, bool retain, MQTTClient_deliveryToken &token) :
-        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
-        qos_(qos), retain_(retain), token_(token) {
-      status_ = 0;
-      read_size_ = 0;
-    }
-    ~ReadCallback() {
-    }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (flow_size_ < max_seg_size_)
-        max_seg_size_ = flow_size_;
-      std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
-      read_size_ = 0;
-      status_ = 0;
-      while (read_size_ < flow_size_) {
-        int readRet = stream->read(&buffer[0], max_seg_size_);
-        if (readRet < 0) {
-          status_ = -1;
-          return read_size_;
-        }
-        if (readRet > 0) {
-          MQTTClient_message pubmsg = MQTTClient_message_initializer;
-          pubmsg.payload = &buffer[0];
-          pubmsg.payloadlen = readRet;
-          pubmsg.qos = qos_;
-          pubmsg.retained = retain_;
-          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, 
&token_) != MQTTCLIENT_SUCCESS) {
-            status_ = -1;
-            return -1;
-          }
-          read_size_ += readRet;
-        } else {
-          break;
-        }
-      }
-      return read_size_;
-    }
-    uint64_t flow_size_;
-    uint64_t max_seg_size_;
-    std::string key_;
-    MQTTClient client_;;
-    int status_;
-    int read_size_;
-    int qos_;
-    int retain_;
-    MQTTClient_deliveryToken &token_;
-  };
-
-public:
-  /**
-   * Function that's executed when the processor is scheduled.
-   * @param context process context.
-   * @param sessionFactory process session factory that is used when creating
-   * ProcessSession objects.
-   */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
-  // OnTrigger method, implemented by NiFi PublishMQTT
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
-  // Initialize, over write by NiFi PublishMQTT
-  virtual void initialize(void);
-
-protected:
-
-private:
-  uint64_t max_seg_size_;
-  bool retain_;
-  std::shared_ptr<logging::Logger> logger_;
-};
-
-REGISTER_RESOURCE (PublishMQTT);
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/controllerservice/MQTTControllerService.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.cpp 
b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
new file mode 100644
index 0000000..b67fc6b
--- /dev/null
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MQTTControllerService.h"
+
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#include <string>
+#include <memory>
+#include <set>
+#include "core/Property.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property MQTTControllerService::BrokerURL("Broker URI", "The URI to use 
to connect to the MQTT broker", "");
+core::Property MQTTControllerService::ClientID("Client ID", "MQTT client ID to 
use", "");
+core::Property MQTTControllerService::UserName("Username", "Username to use 
when connecting to the broker", "");
+core::Property MQTTControllerService::Password("Password", "Password to use 
when connecting to the broker", "");
+core::Property MQTTControllerService::KeepLiveInterval("Keep Alive Interval", 
"Defines the maximum time interval between messages sent or received", "60 
sec");
+core::Property MQTTControllerService::ConnectionTimeOut("Connection Timeout", 
"Maximum time interval the client will wait for the network connection to the 
MQTT server", "30 sec");
+core::Property MQTTControllerService::QOS("Quality of Service", "The Quality 
of Service(QoS) to send the message with. Accepts three values '0', '1' and 
'2'", "MQTT_QOS_0");
+core::Property MQTTControllerService::Topic("Topic", "The topic to publish the 
message to", "");
+core::Property MQTTControllerService::SecurityProtocol("Security Protocol", 
"Protocol used to communicate with brokers", "");
+
+void MQTTControllerService::initialize() {
+  if (initialized_)
+    return;
+
+  std::lock_guard<std::mutex> lock(initialization_mutex_);
+
+  ControllerService::initialize();
+
+  initializeProperties();
+
+  initialized_ = true;
+}
+
+void MQTTControllerService::onEnable() {
+  for (auto &linked_service : linked_services_) {
+    std::shared_ptr<controllers::SSLContextService> ssl_service = 
std::dynamic_pointer_cast<controllers::SSLContextService>(linked_service);
+    if (nullptr != ssl_service) {
+      // security is enabled.
+      ssl_context_service_ = ssl_service;
+    }
+  }
+  if (getProperty(BrokerURL.getName(), uri_) && 
getProperty(ClientID.getName(), clientID_)) {
+    if (!client_) {
+      MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, NULL);
+    }
+
+    if (client_) {
+      MQTTClient_setCallbacks(client_, (void *) this, reconnectCallback, 
receiveCallback, deliveryCallback);
+      // call reconnect to bootstrap
+      this->reconnect();
+    }
+
+  }
+}
+
+void MQTTControllerService::initializeProperties() {
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(BrokerURL);
+  supportedProperties.insert(ClientID);
+  supportedProperties.insert(UserName);
+  supportedProperties.insert(Password);
+
+  supportedProperties.insert(KeepLiveInterval);
+  supportedProperties.insert(ConnectionTimeOut);
+  supportedProperties.insert(Topic);
+  supportedProperties.insert(QOS);
+  supportedProperties.insert(SecurityProtocol);
+  setSupportedProperties(supportedProperties);
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/controllerservice/MQTTControllerService.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h 
b/extensions/mqtt/controllerservice/MQTTControllerService.h
new file mode 100644
index 0000000..a7bbebd
--- /dev/null
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.h
@@ -0,0 +1,342 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_
+
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#include <iostream>
+#include <memory>
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "io/validation.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "controllers/SSLContextService.h"
+#include "concurrentqueue.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class Message {
+ public:
+  // empty constructor facilitates moves
+  Message() {
+  }
+  explicit Message(const std::string &topic, void *data, size_t dataLen)
+      : topic_(topic),
+        data_((uint8_t*) data, ((uint8_t*)data + dataLen)) {
+  }
+  explicit Message(const Message &&other)
+      : topic_(std::move(other.topic_)),
+        data_(std::move(other.data_)) {
+  }
+  ~Message() {
+  }
+
+  Message &operator=(const Message &&other) {
+    topic_ = std::move(other.topic_);
+    data_ = std::move(other.data_);
+    return *this;
+  }
+  std::string topic_;
+  std::vector<uint8_t> data_;
+};
+
+/**
+ * MQTTContextService provides a controller service for MQTT connectivity.
+ *
+ */
+class MQTTControllerService : public core::controller::ControllerService {
+ public:
+  explicit MQTTControllerService(const std::string &name, const std::string 
&id)
+      : ControllerService(name, id),
+        initialized_(false),
+        client_(nullptr),
+        keepAliveInterval_(0),
+        connectionTimeOut_(0),
+        qos_(2),
+        ssl_context_service_(nullptr),
+        logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) {
+  }
+
+  explicit MQTTControllerService(const std::string &name, uuid_t uuid = 0)
+      : ControllerService(name, uuid),
+        initialized_(false),
+        client_(nullptr),
+        keepAliveInterval_(0),
+        connectionTimeOut_(0),
+        qos_(2),
+        ssl_context_service_(nullptr),
+        logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) {
+  }
+
+  explicit MQTTControllerService(const std::string &name, const 
std::shared_ptr<Configure> &configuration)
+      : ControllerService(name, nullptr),
+        initialized_(false),
+        client_(nullptr),
+        keepAliveInterval_(0),
+        connectionTimeOut_(0),
+        qos_(2),
+        ssl_context_service_(nullptr),
+        logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) {
+    setConfiguration(configuration);
+    initialize();
+  }
+
+  static core::Property BrokerURL;
+  static core::Property ClientID;
+  static core::Property UserName;
+  static core::Property Password;
+  static core::Property CleanSession;
+  static core::Property KeepLiveInterval;
+  static core::Property ConnectionTimeOut;
+  static core::Property Topic;
+  static core::Property QOS;
+  static core::Property SecurityProtocol;
+
+  virtual void initialize();
+
+  void yield() {
+
+  }
+
+  int send(const std::string &topic, const std::vector<uint8_t> &data) {
+    int token;
+    MQTTClient_message pubmsg = MQTTClient_message_initializer;
+    const uint8_t *d = data.data();
+    pubmsg.payload = const_cast<uint8_t*>(d);
+    pubmsg.payloadlen = data.size();
+    pubmsg.qos = qos_;
+    pubmsg.retained = 0;
+
+    auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, 
&token);
+    if (resp != MQTTCLIENT_SUCCESS) {
+      return -1;
+    }
+    if (qos_ == 0) {
+      std::unique_lock<std::mutex> lock(delivery_mutex_);
+      delivered_[token] = true;
+    }
+    return token;
+  }
+
+  int send(const std::string &topic, const uint8_t *data, size_t dataSize) {
+    int token;
+
+    MQTTClient_message pubmsg = MQTTClient_message_initializer;
+    pubmsg.payload = const_cast<uint8_t*>(data);
+    pubmsg.payloadlen = dataSize;
+    pubmsg.qos = qos_;
+    pubmsg.retained = 0;
+
+    auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, 
&token);
+    if (resp != MQTTCLIENT_SUCCESS) {
+      return -1;
+    }
+
+    if (qos_ == 0) {
+      std::unique_lock<std::mutex> lock(delivery_mutex_);
+      delivered_[token] = true;
+    }
+    return token;
+  }
+
+  bool isRunning() {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() {
+    return false;
+  }
+
+  virtual void onEnable();
+
+  void subscribeToTopic(const std::string newTopic) {
+    std::lock_guard<std::mutex> lock(initialization_mutex_);
+    if (topics_.find(newTopic) == topics_.end()) {
+      MQTTClient_subscribe(client_, newTopic.c_str(), qos_);
+      topics_[newTopic].size_approx();
+    }
+  }
+
+  bool waitForDelivery(const uint64_t millisToWait, int token) {
+    std::unique_lock<std::mutex> lock(delivery_mutex_);
+    if (delivery_notification_.wait_for(lock, 
std::chrono::milliseconds(millisToWait), [&] {return delivered_[token] == 
true;})) {
+      bool delivered = delivered_[token];
+      delivered_.erase(token);
+      return delivered;
+    } else {
+      delivered_.erase(token);
+      return false;
+    }
+  }
+
+  bool get(const uint64_t millisToWait, const std::string &topic, 
std::vector<uint8_t> &data) {
+    std::unique_lock<std::mutex> lock(delivery_mutex_);
+    if (delivery_notification_.wait_for(lock, 
std::chrono::milliseconds(millisToWait), [&] {return 
topics_[topic].size_approx() > 0;})) {
+      Message resp;
+      if (topics_[topic].try_dequeue(resp)) {
+        data = std::move(resp.data_);
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  bool awaitResponse(const uint64_t millisToWait, int token, const std::string 
&topic, std::vector<uint8_t> &data) {
+    std::unique_lock<std::mutex> lock(delivery_mutex_);
+    if (delivery_notification_.wait_for(lock, 
std::chrono::milliseconds(millisToWait), [&] {
+      return
+      delivered_[token] == true;
+    })) {
+      bool delivered = delivered_[token];
+      if (delivered) {
+        if (delivery_notification_.wait_for(lock, 
std::chrono::milliseconds(millisToWait), [&] {return 
topics_[topic].size_approx() > 0;})) {
+          Message resp;
+          if (topics_[topic].try_dequeue(resp)) {
+            data = std::move(resp.data_);
+            return true;
+          } else {
+            return false;
+          }
+        } else {
+          return false;
+        }
+      }
+      delivered_.erase(token);
+      return delivered;
+    } else {
+      delivered_.erase(token);
+      return false;
+    }
+  }
+
+ protected:
+
+  void acknowledgeDelivery(MQTTClient_deliveryToken token) {
+    std::lock_guard<std::mutex> lock(delivery_mutex_);
+    // locked the mutex
+    auto finder = delivered_.find(token);
+    // only acknowledge delivery if we expect the delivery to occur, otherwise
+    // we won't have any waiters.
+    if (finder != delivered_.end()) {
+      delivered_[token] = true;
+    }
+  }
+
+  void enqueue(const std::string &topic, Message &&message) {
+    std::unique_lock<std::mutex> lock(delivery_mutex_);
+    topics_[topic].enqueue(std::move(message));
+    delivery_notification_.notify_one();
+  }
+
+  static void deliveryCallback(void *context, MQTTClient_deliveryToken dt) {
+    MQTTControllerService *service = (MQTTControllerService *) context;
+    service->acknowledgeDelivery(dt);
+  }
+
+  static int receiveCallback(void *context, char *topicName, int topicLen, 
MQTTClient_message *message) {
+    MQTTControllerService *service = (MQTTControllerService *) context;
+    std::string topic(topicName, topicLen == 0 ? strlen(topicName) : topicLen);
+    Message queueMessage(topic, message->payload, message->payloadlen);
+    service->enqueue(topic, std::move(queueMessage));
+    MQTTClient_freeMessage(&message);
+    MQTTClient_free(topicName);
+    return 1;
+  }
+  static void reconnectCallback(void *context, char *cause) {
+    MQTTControllerService *service = (MQTTControllerService *) context;
+    service->reconnect();
+  }
+
+  bool reconnect() {
+    if (!client_)
+      return false;
+    if (MQTTClient_isConnected(client_))
+      return true;
+    MQTTClient_connectOptions conn_opts = 
MQTTClient_connectOptions_initializer;
+    conn_opts.keepAliveInterval = keepAliveInterval_;
+    conn_opts.cleansession = 1;
+    if (!userName_.empty()) {
+      conn_opts.username = userName_.c_str();
+      conn_opts.password = passWord_.c_str();
+    }
+    if (ssl_context_service_ != nullptr)
+      conn_opts.ssl = &sslopts_;
+    if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
+      logger_->log_error("Failed to connect to MQTT broker %s", uri_);
+      return false;
+    }
+
+    if (!topic_.empty()) {
+      std::unique_lock<std::mutex> lock(delivery_mutex_);
+      MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+    }
+    return true;
+  }
+
+  virtual void initializeProperties();
+
+  std::mutex initialization_mutex_;
+  std::atomic<bool> initialized_;
+
+  MQTTClient client_;
+  std::string uri_;
+  std::string topic_;
+  int64_t keepAliveInterval_;
+  int64_t connectionTimeOut_;
+  int64_t qos_;
+  std::string clientID_;
+  std::string userName_;
+  std::string passWord_;
+
+ private:
+
+  std::map<int, bool> delivered_;
+  std::map<std::string, moodycamel::ConcurrentQueue<Message> > topics_;
+
+  std::mutex delivery_mutex_;
+  std::condition_variable delivery_notification_;
+
+  MQTTClient_SSLOptions sslopts_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
new file mode 100644
index 0000000..345c6c5
--- /dev/null
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -0,0 +1,199 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include <stdio.h>
+#include <memory>
+#include <string>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use 
to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether 
to start afresh or resume previous flows. See the allowable value descriptions 
for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to 
use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to use 
when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use 
when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", 
"Defines the maximum time interval between messages sent or received", "60 
sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", 
"Maximum time interval the client will wait for the network connection to the 
MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality 
of Service(QoS) to send the message with. Accepts three values '0', '1' and 
'2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the 
message to", "");
+core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", 
"Protocol used to communicate with brokers", "");
+core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or 
directory path to CA certificate(s) for verifying the broker's key", "");
+core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to 
client's public key (PEM) used for authentication", "");
+core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private 
Key", "Path to client's private key (PEM) used for authentication", "");
+core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security 
Pass Phrase", "Private key passphrase", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that 
are sent successfully to the destination are transferred to this relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that 
failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+  MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
+  sslEnabled_ = false;
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+    uri_ = value;
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+    clientID_ = value;
+    logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+    topic_ = value;
+    logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+    userName_ = value;
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
+    passWord_ = value;
+    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  }
+  value = "";
+  if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
+      org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
cleanSession_)) {
+    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", 
cleanSession_);
+  }
+  value = "";
+  if (context->getProperty(KeepLiveInterval.getName(), value) && 
!value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      keepAliveInterval_ = valInt/1000;
+      logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%ll]", 
keepAliveInterval_);
+    }
+  }
+  value = "";
+  if (context->getProperty(ConnectionTimeOut.getName(), value) && 
!value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) && 
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      connectionTimeOut_ = valInt/1000;
+      logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", 
connectionTimeOut_);
+    }
+  }
+  value = "";
+  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value 
== MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
+      core::Property::StringToInt(value, valInt)) {
+    qos_ = valInt;
+    logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_);
+  }
+  value = "";
+
+  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
+    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslEnabled_ = true;
+      logger_->log_debug("AbstractMQTTProcessor: ssl enable");
+      value = "";
+      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) 
{
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
+        securityCA_ = value;
+        sslopts_.trustStore = securityCA_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
+        securityCert_ = value;
+        sslopts_.keyStore = securityCert_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
+        securityPrivateKey_ = value;
+        sslopts_.privateKey = securityPrivateKey_.c_str();
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && 
!value.empty()) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", 
value);
+        securityPrivateKeyPassWord_ = value;
+        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      }
+    }
+  }
+  if (!client_) {
+    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, NULL);
+  }
+  if (client_) {
+    MQTTClient_setCallbacks(client_, (void *) this, connectionLost, 
msgReceived, msgDelivered);
+    // call reconnect to bootstrap
+    this->reconnect();
+  }
+}
+
+bool AbstractMQTTProcessor::reconnect() {
+  if (!client_)
+    return false;
+  if (MQTTClient_isConnected(client_))
+    return true;
+  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
+  conn_opts.keepAliveInterval = keepAliveInterval_;
+  conn_opts.cleansession = cleanSession_;
+  if (!userName_.empty()) {
+    conn_opts.username = userName_.c_str();
+    conn_opts.password = passWord_.c_str();
+  }
+  if (sslEnabled_)
+    conn_opts.ssl = &sslopts_;
+  if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
+    logger_->log_error("Failed to connect to MQTT broker %s", uri_);
+    return false;
+  }
+  if (isSubscriber_) {
+    MQTTClient_subscribe(client_, topic_.c_str(), qos_);
+  }
+  return true;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/AbstractMQTTProcessor.h
----------------------------------------------------------------------
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h 
b/extensions/mqtt/processors/AbstractMQTTProcessor.h
new file mode 100644
index 0000000..aab0ef5
--- /dev/null
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -0,0 +1,168 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define MQTT_SECURITY_PROTOCOL_SSL "ssl"
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid),
+        logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) {
+    client_ = nullptr;
+    cleanSession_ = false;
+    keepAliveInterval_ = 60;
+    connectionTimeOut_ = 30;
+    qos_ = 0;
+    isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+    if (isSubscriber_) {
+      MQTTClient_unsubscribe(client_, topic_.c_str());
+    }
+    if (client_ && MQTTClient_isConnected(client_)) {
+      MQTTClient_disconnect(client_, connectionTimeOut_);
+    }
+    if (client_)
+      MQTTClient_destroy(&client_);
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "AbstractMQTTProcessor";
+  // Supported Properties
+  static core::Property BrokerURL;
+  static core::Property ClientID;
+  static core::Property UserName;
+  static core::Property PassWord;
+  static core::Property CleanSession;
+  static core::Property KeepLiveInterval;
+  static core::Property ConnectionTimeOut;
+  static core::Property Topic;
+  static core::Property QOS;
+  static core::Property SecurityProtocol;
+  static core::Property SecurityCA;
+  static core::Property SecurityCert;
+  static core::Property SecurityPrivateKey;
+  static core::Property SecurityPrivateKeyPassWord;
+
+  // Supported Relationships
+  static core::Relationship Failure;
+  static core::Relationship Success;
+
+ public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  virtual void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
+  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
+  }
+  // OnTrigger method, implemented by NiFi AbstractMQTTProcessor
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
+  }
+  // Initialize, over write by NiFi AbstractMQTTProcessor
+  virtual void initialize(void);
+  // MQTT async callbacks
+  static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
+    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    processor->delivered_token_ = dt;
+  }
+  static int msgReceived(void *context, char *topicName, int topicLen, 
MQTTClient_message *message) {
+    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    if (processor->isSubscriber_) {
+      if (!processor->enqueueReceiveMQTTMsg(message))
+        MQTTClient_freeMessage(&message);
+    } else {
+      MQTTClient_freeMessage(&message);
+    }
+    MQTTClient_free(topicName);
+    return 1;
+  }
+  static void connectionLost(void *context, char *cause) {
+    AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
+    processor->reconnect();
+  }
+  bool reconnect();
+  // enqueue receive MQTT message
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+    return false;
+  }
+
+ protected:
+  MQTTClient client_;
+  MQTTClient_deliveryToken delivered_token_;
+  std::string uri_;
+  std::string topic_;
+  int64_t keepAliveInterval_;
+  int64_t connectionTimeOut_;
+  int64_t qos_;
+  bool cleanSession_;
+  std::string clientID_;
+  std::string userName_;
+  std::string passWord_;
+  bool isSubscriber_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  MQTTClient_SSLOptions sslopts_;
+  bool sslEnabled_;
+  std::string securityCA_;
+  std::string securityCert_;
+  std::string securityPrivateKey_;
+  std::string securityPrivateKeyPassWord_;
+};
+
+REGISTER_RESOURCE(AbstractMQTTProcessor);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

Reply via email to