MINIFICPP-394: Implement MQTT C2 protocol along with MQTT/REST translation layer and resolve osx build issues.
This closes #321. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/8dd7e91f Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/8dd7e91f Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/8dd7e91f Branch: refs/heads/master Commit: 8dd7e91f9ae2358cc22b839eeee006f0025b4783 Parents: 30df9a5 Author: Marc Parisi <[email protected]> Authored: Tue May 1 14:29:02 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Tue May 8 14:03:13 2018 -0400 ---------------------------------------------------------------------- C2.md | 165 ++++++++- PROCESSORS.md | 57 ++++ README.md | 12 +- bootstrap.sh | 2 + extensions/mqtt/AbstractMQTTProcessor.cpp | 199 ----------- extensions/mqtt/AbstractMQTTProcessor.h | 168 --------- extensions/mqtt/CMakeLists.txt | 12 +- extensions/mqtt/ConsumeMQTT.cpp | 119 ------- extensions/mqtt/ConsumeMQTT.h | 125 ------- extensions/mqtt/MQTTLoader.h | 31 +- extensions/mqtt/PublishMQTT.cpp | 106 ------ extensions/mqtt/PublishMQTT.h | 142 -------- .../controllerservice/MQTTControllerService.cpp | 100 ++++++ .../controllerservice/MQTTControllerService.h | 342 +++++++++++++++++++ .../mqtt/processors/AbstractMQTTProcessor.cpp | 199 +++++++++++ .../mqtt/processors/AbstractMQTTProcessor.h | 168 +++++++++ extensions/mqtt/processors/ConsumeMQTT.cpp | 119 +++++++ extensions/mqtt/processors/ConsumeMQTT.h | 125 +++++++ extensions/mqtt/processors/ConvertBase.cpp | 69 ++++ extensions/mqtt/processors/ConvertBase.h | 90 +++++ extensions/mqtt/processors/ConvertHeartBeat.cpp | 75 ++++ extensions/mqtt/processors/ConvertHeartBeat.h | 78 +++++ extensions/mqtt/processors/ConvertJSONAck.cpp | 115 +++++++ extensions/mqtt/processors/ConvertJSONAck.h | 105 ++++++ extensions/mqtt/processors/ConvertUpdate.cpp | 106 ++++++ extensions/mqtt/processors/ConvertUpdate.h | 91 +++++ extensions/mqtt/processors/PublishMQTT.cpp | 106 ++++++ extensions/mqtt/processors/PublishMQTT.h | 142 ++++++++ extensions/mqtt/protocol/MQTTC2Protocol.cpp | 103 ++++++ extensions/mqtt/protocol/MQTTC2Protocol.h | 96 ++++++ extensions/mqtt/protocol/PayloadSerializer.cpp | 38 +++ extensions/mqtt/protocol/PayloadSerializer.h | 318 +++++++++++++++++ .../rocksdb-repos/DatabaseContentRepository.h | 4 +- libminifi/include/c2/C2Payload.h | 5 + .../include/core/controller/ControllerService.h | 5 + .../StandardControllerServiceProvider.h | 2 +- libminifi/include/properties/Configure.h | 14 + libminifi/include/utils/ByteArrayCallback.h | 16 +- libminifi/src/FlowController.cpp | 16 +- libminifi/src/c2/C2Agent.cpp | 1 + libminifi/src/c2/C2Payload.cpp | 7 + libminifi/src/core/ProcessSession.cpp | 2 +- .../StandardControllerServiceNode.cpp | 6 + libminifi/src/utils/ByteArrayCallback.cpp | 15 + 44 files changed, 2926 insertions(+), 890 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/C2.md ---------------------------------------------------------------------- diff --git a/C2.md b/C2.md index 194467c..307cd59 100644 --- a/C2.md +++ b/C2.md @@ -44,14 +44,16 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal in minifi.properties - # Disable/Enable C2 - nifi.c2.enable=true + # Disable/Enable C2 + nifi.c2.enable=true # specify classes for the AST response nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation # specify C2 protocol -- default is RESTSender if this is not specified c2.agent.protocol.class=RESTSender + # may also use MQTT + # c2.agent.protocol.class=MQTTC2Protocol # control c2 heartbeat interval in millisecocnds c2.agent.heartbeat.period=3000 @@ -71,7 +73,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal # configure SSL Context service for REST Protocol c2.rest.ssl.context.service - + ### Metrics @@ -127,3 +129,160 @@ configuration produces the following JSON: } } + +### Protocols + +The default protocol is a RESTFul service; however, there is an MQTT protocol with a translation to use the +RESTFul C2 server. This is useful for cases where an MQTT C2 server isn't available, or enclave partioning +requires a single ingress/egress through a gateway. In these classes of devices, MQTT can be used as the intermediate +or RESTSender can be used for C2 operations. + +As defined, above, MQTTC2Protocol can be used for the agent protocol class. If you wish to communicate with a RESTFul C2 server, +you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the transation. +State is not kept with an intermediate agent other than the broker. The broker is not embedded with the agent to simplify the agent. + +An example configuration, below, defines an agent that receives and forward MQTT C2 requests to a C2 server. Additionally, this agent +will forward responses and updates to the heartbeating agents. + + MiNiFi Config Version: 3 + Flow Controller: + name: GetFile + Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 + variable registry properties: '' + FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 + Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false + Provenance Repository: + provenance rollover time: 1 min + Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min + Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC + Processors: + - id: 24493a28-015a-1000-0000-000000000000 + name: convert + class: org.apache.nifi.processors.standard.ConvertHeartBeat + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 10 msec + penalization period: 30 sec + yield period: 2 sec + run duration nanos: 10 msec + auto-terminated relationships list: + Properties: + MQTT Controller Service: mqttservice + Listening Topic: heartbeats + - id: 24493a28-015a-1000-0000-000000000006 + name: convertJSON + class: org.apache.nifi.processors.standard.ConvertJSONAck + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 10 msec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 10 msec + auto-terminated relationships list: + - success + Properties: + MQTT Controller Service: mqttservice + - id: 24493a28-015a-1000-0000-000000000007 + name: convertupdate + class: org.apache.nifi.processors.standard.ConvertUpdate + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 10 msec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 10 msec + auto-terminated relationships list: + - success + Properties: + MQTT Controller Service: mqttservice + Listening Topic: updates + - id: 24493a28-015a-1000-0000-000000000021 + name: httpheartbeat + class: org.apache.nifi.processors.standard.InvokeHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 10 msec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 10 msec + auto-terminated relationships list: + Properties: + HTTP Method: POST + Remote URL: http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat + Content-type: application/json + - id: 24493a28-015a-1000-0000-000000000022 + name: log + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 100 msec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 1 msec + auto-terminated relationships list: + - success + Properties: + Controller Services: + - id: 94491a38-015a-1000-0000-000000000001 + name: mqttservice + class: MQTTContextService + Properties: + Broker URI: localhost:1883 + Client ID: hiblajl + Quality of Service: 2 + Process Groups: [] + Input Ports: [] + Output Ports: [] + Funnels: [] + Connections: + - id: 1d09c015-015d-1000-0000-000000000000 + name: convert/success/httpheartbeat + source id: 24493a28-015a-1000-0000-000000000000 + source relationship name: success + destination id: 24493a28-015a-1000-0000-000000000021 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer + - id: 1d09c015-015d-1000-0000-000000000002 + name: httpheartbeat/success/convertJSON + source id: 24493a28-015a-1000-0000-000000000021 + source relationship name: success + destination id: 24493a28-015a-1000-0000-000000000006 + max work queue size: 10000 + max work queue data size: 1 GB + flowfile expiration: 0 sec + queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer + Remote Process Groups: [] + NiFi Properties Overrides: {} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/PROCESSORS.md ---------------------------------------------------------------------- diff --git a/PROCESSORS.md b/PROCESSORS.md index e57c16e..4588236 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -21,6 +21,9 @@ - [ApplyTemplate](#applytemplate) - [CompressContent](#compresscontent) - [ConsumeMQTT](#consumemqtt) +- [ConvertHeartBeat](#convertheartbeat) +- [ConvertJSONAck](#convertjsonack) +- [ConvertUpdate](#convertupdate) - [ExecuteProcess](#executeprocess) - [ExecuteScript](#executescript) - [ExecuteSQL](#executesql) @@ -96,6 +99,60 @@ default values, and whether a property supports the NiFi Expression Language. | - | - | | success | All FlowFiles are routed to this relationship. | +## ConvertHeartBeat + +This Processor converts MQTT heartbeats into a JSON repreesntation. + +### Properties + +In the list below, the names of required properties appear in bold. Any other +properties (not in bold) are considered optional. The table also indicates any +default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +| **MQTT Controller Service** | | | The MQTT Controller service | +| Listening topic | | | The topic on which we will listen to get MQTT C2 messages | + + +## ConvertJSONAck + +This Processor parses C2 respones (acknowledgements) and forwards them to the MQTT agent. + +### Properties + +In the list below, the names of required properties appear in bold. Any other +properties (not in bold) are considered optional. The table also indicates any +default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +| **MQTT Controller Service** | | | The MQTT Controller service | +| Listening topic | | | The topic on which we will listen to get MQTT C2 messages | + +### Relationships + +| Name | Description | +| - | - | +| success | Any successful http response flow file will be sent to this relationship | + +## ConvertUpdate + +This converts MQTT update messages into an HTTP request to retrieve an update. This +processor requires cURL support. If it does not exist this processor will be a NOOP. + +### Properties + +In the list below, the names of required properties appear in bold. Any other +properties (not in bold) are considered optional. The table also indicates any +default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +| **MQTT Controller Service** | | | The MQTT Controller service | +| SSL Context Service | | | SSL context service used for HTTP requestor. | +| Listening topic | | | The topic on which we will listen to get MQTT C2 messages | + ## ExecuteProcess ### Description http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 3f85f82..32eb897 100644 --- a/README.md +++ b/README.md @@ -702,7 +702,17 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned Trigger Threshold: 90 Low Battery Threshold: 50 Wait Period: 500 ms - +### MQTT Controller service +The MQTTController Service can be configured for MQTT connectivity and provide that capability to your processors when MQTT is built. + + Controller Services: + - name: mqttservice + id: 294491a38-015a-1000-0000-000000000001 + class: MQTTContextService + Properties: + Broker URI: localhost:1883 + Client ID: client ID + Quality of Service: 2 ### Running After completing a [build](#building), the application can be run by issuing the following from : http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/bootstrap.sh ---------------------------------------------------------------------- diff --git a/bootstrap.sh b/bootstrap.sh index 7706a58..1ea5646 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -185,6 +185,8 @@ elif [[ "$OS" = Deb* ]]; then . "${script_directory}/debian.sh" elif [[ "$OS" = Rasp* ]]; then . "${script_directory}/aptitude.sh" +elif [[ "$OS" = Pop* ]]; then + . "${script_directory}/aptitude.sh" elif [[ "$OS" = Ubuntu* ]]; then . "${script_directory}/aptitude.sh" elif [[ "$OS" = *SUSE* ]]; then http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/AbstractMQTTProcessor.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/AbstractMQTTProcessor.cpp b/extensions/mqtt/AbstractMQTTProcessor.cpp deleted file mode 100644 index 345c6c5..0000000 --- a/extensions/mqtt/AbstractMQTTProcessor.cpp +++ /dev/null @@ -1,199 +0,0 @@ -/** - * @file AbstractMQTTProcessor.cpp - * AbstractMQTTProcessor class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "AbstractMQTTProcessor.h" -#include <stdio.h> -#include <memory> -#include <string> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); -core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); -core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); -core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); -core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); -core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); -core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); -core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); -core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); -core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); -core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); -core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); -core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); -core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); -core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); -core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); - -void AbstractMQTTProcessor::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - relationships.insert(Failure); - setSupportedRelationships(relationships); - MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer; - sslEnabled_ = false; -} - -void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - std::string value; - int64_t valInt; - value = ""; - if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { - uri_ = value; - logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_); - } - value = ""; - if (context->getProperty(ClientID.getName(), value) && !value.empty()) { - clientID_ = value; - logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_); - } - value = ""; - if (context->getProperty(Topic.getName(), value) && !value.empty()) { - topic_ = value; - logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_); - } - value = ""; - if (context->getProperty(UserName.getName(), value) && !value.empty()) { - userName_ = value; - logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_); - } - value = ""; - if (context->getProperty(PassWord.getName(), value) && !value.empty()) { - passWord_ = value; - logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_); - } - value = ""; - if (context->getProperty(CleanSession.getName(), value) && !value.empty() && - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) { - logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_); - } - value = ""; - if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { - keepAliveInterval_ = valInt/1000; - logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%ll]", keepAliveInterval_); - } - } - value = ""; - if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { - connectionTimeOut_ = valInt/1000; - logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", connectionTimeOut_); - } - } - value = ""; - if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) && - core::Property::StringToInt(value, valInt)) { - qos_ = valInt; - logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); - } - value = ""; - - if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { - if (value == MQTT_SECURITY_PROTOCOL_SSL) { - sslEnabled_ = true; - logger_->log_debug("AbstractMQTTProcessor: ssl enable"); - value = ""; - if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { - logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value); - securityCA_ = value; - sslopts_.trustStore = securityCA_.c_str(); - } - value = ""; - if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { - logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value); - securityCert_ = value; - sslopts_.keyStore = securityCert_.c_str(); - } - value = ""; - if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { - logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value); - securityPrivateKey_ = value; - sslopts_.privateKey = securityPrivateKey_.c_str(); - } - value = ""; - if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { - logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value); - securityPrivateKeyPassWord_ = value; - sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str(); - } - } - } - if (!client_) { - MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); - } - if (client_) { - MQTTClient_setCallbacks(client_, (void *) this, connectionLost, msgReceived, msgDelivered); - // call reconnect to bootstrap - this->reconnect(); - } -} - -bool AbstractMQTTProcessor::reconnect() { - if (!client_) - return false; - if (MQTTClient_isConnected(client_)) - return true; - MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; - conn_opts.keepAliveInterval = keepAliveInterval_; - conn_opts.cleansession = cleanSession_; - if (!userName_.empty()) { - conn_opts.username = userName_.c_str(); - conn_opts.password = passWord_.c_str(); - } - if (sslEnabled_) - conn_opts.ssl = &sslopts_; - if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { - logger_->log_error("Failed to connect to MQTT broker %s", uri_); - return false; - } - if (isSubscriber_) { - MQTTClient_subscribe(client_, topic_.c_str(), qos_); - } - return true; -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/AbstractMQTTProcessor.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/AbstractMQTTProcessor.h b/extensions/mqtt/AbstractMQTTProcessor.h deleted file mode 100644 index aab0ef5..0000000 --- a/extensions/mqtt/AbstractMQTTProcessor.h +++ /dev/null @@ -1,168 +0,0 @@ -/** - * @file AbstractMQTTProcessor.h - * AbstractMQTTProcessor class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __ABSTRACTMQTT_H__ -#define __ABSTRACTMQTT_H__ - -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/logging/LoggerConfiguration.h" -#include "MQTTClient.h" - -#define MQTT_QOS_0 "0" -#define MQTT_QOS_1 "1" -#define MQTT_QOS_2 "2" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext" -#define MQTT_SECURITY_PROTOCOL_SSL "ssl" - -// AbstractMQTTProcessor Class -class AbstractMQTTProcessor : public core::Processor { - public: - // Constructor - /*! - * Create a new processor - */ - explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) - : core::Processor(name, uuid), - logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) { - client_ = nullptr; - cleanSession_ = false; - keepAliveInterval_ = 60; - connectionTimeOut_ = 30; - qos_ = 0; - isSubscriber_ = false; - } - // Destructor - virtual ~AbstractMQTTProcessor() { - if (isSubscriber_) { - MQTTClient_unsubscribe(client_, topic_.c_str()); - } - if (client_ && MQTTClient_isConnected(client_)) { - MQTTClient_disconnect(client_, connectionTimeOut_); - } - if (client_) - MQTTClient_destroy(&client_); - } - // Processor Name - static constexpr char const* ProcessorName = "AbstractMQTTProcessor"; - // Supported Properties - static core::Property BrokerURL; - static core::Property ClientID; - static core::Property UserName; - static core::Property PassWord; - static core::Property CleanSession; - static core::Property KeepLiveInterval; - static core::Property ConnectionTimeOut; - static core::Property Topic; - static core::Property QOS; - static core::Property SecurityProtocol; - static core::Property SecurityCA; - static core::Property SecurityCert; - static core::Property SecurityPrivateKey; - static core::Property SecurityPrivateKeyPassWord; - - // Supported Relationships - static core::Relationship Failure; - static core::Relationship Success; - - public: - /** - * Function that's executed when the processor is scheduled. - * @param context process context. - * @param sessionFactory process session factory that is used when creating - * ProcessSession objects. - */ - virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - // OnTrigger method, implemented by NiFi AbstractMQTTProcessor - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - } - // OnTrigger method, implemented by NiFi AbstractMQTTProcessor - virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - } - // Initialize, over write by NiFi AbstractMQTTProcessor - virtual void initialize(void); - // MQTT async callbacks - static void msgDelivered(void *context, MQTTClient_deliveryToken dt) { - AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; - processor->delivered_token_ = dt; - } - static int msgReceived(void *context, char *topicName, int topicLen, MQTTClient_message *message) { - AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; - if (processor->isSubscriber_) { - if (!processor->enqueueReceiveMQTTMsg(message)) - MQTTClient_freeMessage(&message); - } else { - MQTTClient_freeMessage(&message); - } - MQTTClient_free(topicName); - return 1; - } - static void connectionLost(void *context, char *cause) { - AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; - processor->reconnect(); - } - bool reconnect(); - // enqueue receive MQTT message - virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message) { - return false; - } - - protected: - MQTTClient client_; - MQTTClient_deliveryToken delivered_token_; - std::string uri_; - std::string topic_; - int64_t keepAliveInterval_; - int64_t connectionTimeOut_; - int64_t qos_; - bool cleanSession_; - std::string clientID_; - std::string userName_; - std::string passWord_; - bool isSubscriber_; - - private: - std::shared_ptr<logging::Logger> logger_; - MQTTClient_SSLOptions sslopts_; - bool sslEnabled_; - std::string securityCA_; - std::string securityCert_; - std::string securityPrivateKey_; - std::string securityPrivateKeyPassWord_; -}; - -REGISTER_RESOURCE(AbstractMQTTProcessor); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/mqtt/CMakeLists.txt b/extensions/mqtt/CMakeLists.txt index d248524..3374557 100644 --- a/extensions/mqtt/CMakeLists.txt +++ b/extensions/mqtt/CMakeLists.txt @@ -20,11 +20,11 @@ set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/) +include_directories(./controllerservice ./processors ./protocol ../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/) include_directories(../../thirdparty/paho.mqtt.c/src) -file(GLOB SOURCES "*.cpp") +file(GLOB SOURCES "*.cpp" "protocol/*.cpp" "processors/*.cpp" "controllerservice/*.cpp") add_library(minifi-mqtt-extensions STATIC ${SOURCES}) set_property(TARGET minifi-mqtt-extensions PROPERTY POSITION_INDEPENDENT_CODE ON) @@ -37,10 +37,6 @@ endif() # Include UUID -find_package(UUID REQUIRED) -target_link_libraries(minifi-mqtt-extensions ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB}) -find_package(OpenSSL REQUIRED) -include_directories(${OPENSSL_INCLUDE_DIR}) target_link_libraries(minifi-mqtt-extensions ${CMAKE_DL_LIBS} ) if (MQTT_FOUND AND NOT BUILD_MQTT) target_link_libraries(minifi-mqtt-extensions ${MQTT_LIBRARIES} ) @@ -55,7 +51,7 @@ include_directories(${ZLIB_INCLUDE_DIRS}) target_link_libraries (minifi-mqtt-extensions ${ZLIB_LIBRARIES}) if (WIN32) set_target_properties(minifi-mqtt-extensions PROPERTIES - LINK_FLAGS "/WHOLEMQTT" + LINK_FLAGS "/WHOLEARCHIVE" ) elseif (APPLE) set_target_properties(minifi-mqtt-extensions PROPERTIES @@ -63,7 +59,7 @@ elseif (APPLE) ) else () set_target_properties(minifi-mqtt-extensions PROPERTIES - LINK_FLAGS "-Wl,--whole-mqtt" + LINK_FLAGS "-Wl,--whole-archive" ) endif () http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/ConsumeMQTT.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/ConsumeMQTT.cpp b/extensions/mqtt/ConsumeMQTT.cpp deleted file mode 100644 index 472d35f..0000000 --- a/extensions/mqtt/ConsumeMQTT.cpp +++ /dev/null @@ -1,119 +0,0 @@ -/** - * @file ConsumeMQTT.cpp - * ConsumeMQTT class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ConsumeMQTT.h" -#include <stdio.h> -#include <algorithm> -#include <memory> -#include <string> -#include <map> -#include <set> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); -core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", ""); - -void ConsumeMQTT::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); - properties.insert(MaxFlowSegSize); - properties.insert(QueueBufferMaxMessage); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { - if (queue_.size_approx() >= maxQueueSize_) { - logger_->log_debug("MQTT queue full"); - return false; - } else { - if (message->payloadlen > maxSegSize_) - message->payloadlen = maxSegSize_; - queue_.enqueue(message); - logger_->log_debug("enqueue MQTT message length %d", message->payloadlen); - return true; - } -} - -void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - AbstractMQTTProcessor::onSchedule(context, sessionFactory); - std::string value; - int64_t valInt; - value = ""; - if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - maxQueueSize_ = valInt; - logger_->log_debug("ConsumeMQTT: Queue Max Message [%ll]", maxQueueSize_); - } - value = ""; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - maxSegSize_ = valInt; - logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%ll]", maxSegSize_); - } -} - -void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - // reconnect if necessary - reconnect(); - std::deque<MQTTClient_message *> msg_queue; - getReceivedMQTTMsg(msg_queue); - while (!msg_queue.empty()) { - MQTTClient_message *message = msg_queue.front(); - std::shared_ptr<core::FlowFile> processFlowFile = session->create(); - ConsumeMQTT::WriteCallback callback(message); - session->write(processFlowFile, &callback); - if (callback.status_ < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); - } else { - session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_.c_str()); - session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_.c_str()); - logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", processFlowFile->getUUIDStr(), topic_); - session->transfer(processFlowFile, Success); - } - MQTTClient_freeMessage(&message); - msg_queue.pop_front(); - } -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/ConsumeMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/ConsumeMQTT.h b/extensions/mqtt/ConsumeMQTT.h deleted file mode 100644 index 0b26d42..0000000 --- a/extensions/mqtt/ConsumeMQTT.h +++ /dev/null @@ -1,125 +0,0 @@ -/** - * @file ConsumeMQTT.h - * ConsumeMQTT class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __CONSUME_MQTT_H__ -#define __CONSUME_MQTT_H__ - -#include <limits> -#include <deque> -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/Property.h" -#include "core/logging/LoggerConfiguration.h" -#include "concurrentqueue.h" -#include "MQTTClient.h" -#include "AbstractMQTTProcessor.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" -#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" - -// ConsumeMQTT Class -class ConsumeMQTT: public processors::AbstractMQTTProcessor { -public: - // Constructor - /*! - * Create a new processor - */ - explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) - : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<ConsumeMQTT>::getLogger()) { - isSubscriber_ = true; - maxQueueSize_ = 100; - maxSegSize_ = ULLONG_MAX; - } - // Destructor - virtual ~ConsumeMQTT() { - MQTTClient_message *message; - while (queue_.try_dequeue(message)) { - MQTTClient_freeMessage(&message); - } - } - // Processor Name - static constexpr char const* ProcessorName = "ConsumeMQTT"; - // Supported Properties - static core::Property MaxFlowSegSize; - static core::Property QueueBufferMaxMessage; - // Nest Callback Class for write stream - class WriteCallback: public OutputStreamCallback { - public: - WriteCallback(MQTTClient_message *message) : - message_(message) { - status_ = 0; - } - MQTTClient_message *message_; - int64_t process(std::shared_ptr<io::BaseStream> stream) { - int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen); - if (len < 0) - status_ = -1; - return len; - } - int status_; - }; - -public: - /** - * Function that's executed when the processor is scheduled. - * @param context process context. - * @param sessionFactory process session factory that is used when creating - * ProcessSession objects. - */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - // OnTrigger method, implemented by NiFi ConsumeMQTT - virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); - // Initialize, over write by NiFi ConsumeMQTT - virtual void initialize(void); - virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); - -protected: - void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) { - MQTTClient_message *message; - while (queue_.try_dequeue(message)) { - msg_queue.push_back(message); - } - } - -private: - std::shared_ptr<logging::Logger> logger_; - std::mutex mutex_; - uint64_t maxQueueSize_; - uint64_t maxSegSize_; - moodycamel::ConcurrentQueue<MQTTClient_message *> queue_; -}; - -REGISTER_RESOURCE (ConsumeMQTT); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/MQTTLoader.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/MQTTLoader.h b/extensions/mqtt/MQTTLoader.h index d337af5..d8d3e8f 100644 --- a/extensions/mqtt/MQTTLoader.h +++ b/extensions/mqtt/MQTTLoader.h @@ -18,10 +18,14 @@ #ifndef EXTENSION_MQTTLOADER_H #define EXTENSION_MQTTLOADER_H -#include "PublishMQTT.h" -#include "ConsumeMQTT.h" +#include "controllerservice/MQTTControllerService.h" +#include "processors/PublishMQTT.h" +#include "processors/ConsumeMQTT.h" +#include "MQTTC2Protocol.h" #include "core/ClassLoader.h" - +#include "ConvertHeartBeat.h" +#include "ConvertJSONAck.h" +#include "ConvertUpdate.h" class __attribute__((visibility("default"))) MQTTFactory : public core::ObjectFactory { public: MQTTFactory() { @@ -47,17 +51,30 @@ class __attribute__((visibility("default"))) MQTTFactory : public core::ObjectFa std::vector<std::string> class_names; class_names.push_back("PublishMQTT"); class_names.push_back("ConsumeMQTT"); + class_names.push_back("MQTTContextService"); + class_names.push_back("MQTTC2Protocol"); + class_names.push_back("ConvertHeartBeat"); + class_names.push_back("ConvertJSONAck"); + class_names.push_back("ConvertUpdate"); return class_names; } virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) { if (utils::StringUtils::equalsIgnoreCase(class_name, "PublishMQTT")) { return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PublishMQTT>()); - } - else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConsumeMQTT")) { + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConsumeMQTT")) { return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ConsumeMQTT>()); - } - else { + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "MQTTContextService")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::controllers::MQTTControllerService>()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "MQTTC2Protocol")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::MQTTC2Protocol>()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConvertHeartBeat")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ConvertHeartBeat>()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConvertJSONAck")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ConvertJSONAck>()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ConvertUpdate")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ConvertUpdate>()); + } else { return nullptr; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/PublishMQTT.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/PublishMQTT.cpp b/extensions/mqtt/PublishMQTT.cpp deleted file mode 100644 index 411cc2d..0000000 --- a/extensions/mqtt/PublishMQTT.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/** - * @file PublishMQTT.cpp - * PublishMQTT class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "PublishMQTT.h" -#include <stdio.h> -#include <algorithm> -#include <memory> -#include <string> -#include <map> -#include <set> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false"); -core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); - -void PublishMQTT::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); - properties.insert(Retain); - properties.insert(MaxFlowSegSize); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - relationships.insert(Failure); - setSupportedRelationships(relationships); -} - -void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - AbstractMQTTProcessor::onSchedule(context, sessionFactory); - std::string value; - int64_t valInt; - value = ""; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - max_seg_size_ = valInt; - logger_->log_debug("PublishMQTT: max flow segment size [%ll]", max_seg_size_); - } - value = ""; - if (context->getProperty(Retain.getName(), value) && !value.empty() && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, retain_)) { - logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - } -} - -void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - std::shared_ptr<core::FlowFile> flowFile = session->get(); - - if (!flowFile) { - return; - } - - if (!reconnect()) { - logger_->log_error("MQTT connect to %s failed", uri_); - session->transfer(flowFile, Failure); - return; - } - - PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, topic_, client_, qos_, retain_, delivered_token_); - session->read(flowFile, &callback); - if (callback.status_ < 0) { - logger_->log_error("Failed to send flow to MQTT topic %s", topic_); - session->transfer(flowFile, Failure); - } else { - logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); - session->transfer(flowFile, Success); - } -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/PublishMQTT.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/PublishMQTT.h b/extensions/mqtt/PublishMQTT.h deleted file mode 100644 index ed17bd4..0000000 --- a/extensions/mqtt/PublishMQTT.h +++ /dev/null @@ -1,142 +0,0 @@ -/** - * @file PublishMQTT.h - * PublishMQTT class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PUBLISH_MQTT_H__ -#define __PUBLISH_MQTT_H__ - -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/Core.h" -#include "core/Resource.h" -#include "core/Property.h" -#include "core/logging/LoggerConfiguration.h" -#include "MQTTClient.h" -#include "AbstractMQTTProcessor.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -// PublishMQTT Class -class PublishMQTT: public processors::AbstractMQTTProcessor { -public: - // Constructor - /*! - * Create a new processor - */ - explicit PublishMQTT(std::string name, uuid_t uuid = NULL) - : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) { - retain_ = false; - max_seg_size_ = ULLONG_MAX; - } - // Destructor - virtual ~PublishMQTT() { - } - // Processor Name - static constexpr char const* ProcessorName = "PublishMQTT"; - // Supported Properties - static core::Property Retain; - static core::Property MaxFlowSegSize; - - // Nest Callback Class for read stream - class ReadCallback: public InputStreamCallback { - public: - ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, - int qos, bool retain, MQTTClient_deliveryToken &token) : - flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), - qos_(qos), retain_(retain), token_(token) { - status_ = 0; - read_size_ = 0; - } - ~ReadCallback() { - } - int64_t process(std::shared_ptr<io::BaseStream> stream) { - if (flow_size_ < max_seg_size_) - max_seg_size_ = flow_size_; - std::vector<unsigned char> buffer; - buffer.reserve(max_seg_size_); - read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { - int readRet = stream->read(&buffer[0], max_seg_size_); - if (readRet < 0) { - status_ = -1; - return read_size_; - } - if (readRet > 0) { - MQTTClient_message pubmsg = MQTTClient_message_initializer; - pubmsg.payload = &buffer[0]; - pubmsg.payloadlen = readRet; - pubmsg.qos = qos_; - pubmsg.retained = retain_; - if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) { - status_ = -1; - return -1; - } - read_size_ += readRet; - } else { - break; - } - } - return read_size_; - } - uint64_t flow_size_; - uint64_t max_seg_size_; - std::string key_; - MQTTClient client_;; - int status_; - int read_size_; - int qos_; - int retain_; - MQTTClient_deliveryToken &token_; - }; - -public: - /** - * Function that's executed when the processor is scheduled. - * @param context process context. - * @param sessionFactory process session factory that is used when creating - * ProcessSession objects. - */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - // OnTrigger method, implemented by NiFi PublishMQTT - virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); - // Initialize, over write by NiFi PublishMQTT - virtual void initialize(void); - -protected: - -private: - uint64_t max_seg_size_; - bool retain_; - std::shared_ptr<logging::Logger> logger_; -}; - -REGISTER_RESOURCE (PublishMQTT); - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/controllerservice/MQTTControllerService.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.cpp b/extensions/mqtt/controllerservice/MQTTControllerService.cpp new file mode 100644 index 0000000..b67fc6b --- /dev/null +++ b/extensions/mqtt/controllerservice/MQTTControllerService.cpp @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MQTTControllerService.h" + +#include <openssl/err.h> +#include <openssl/ssl.h> +#include <string> +#include <memory> +#include <set> +#include "core/Property.h" +#include "io/validation.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +core::Property MQTTControllerService::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property MQTTControllerService::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property MQTTControllerService::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property MQTTControllerService::Password("Password", "Password to use when connecting to the broker", ""); +core::Property MQTTControllerService::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property MQTTControllerService::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property MQTTControllerService::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property MQTTControllerService::Topic("Topic", "The topic to publish the message to", ""); +core::Property MQTTControllerService::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); + +void MQTTControllerService::initialize() { + if (initialized_) + return; + + std::lock_guard<std::mutex> lock(initialization_mutex_); + + ControllerService::initialize(); + + initializeProperties(); + + initialized_ = true; +} + +void MQTTControllerService::onEnable() { + for (auto &linked_service : linked_services_) { + std::shared_ptr<controllers::SSLContextService> ssl_service = std::dynamic_pointer_cast<controllers::SSLContextService>(linked_service); + if (nullptr != ssl_service) { + // security is enabled. + ssl_context_service_ = ssl_service; + } + } + if (getProperty(BrokerURL.getName(), uri_) && getProperty(ClientID.getName(), clientID_)) { + if (!client_) { + MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); + } + + if (client_) { + MQTTClient_setCallbacks(client_, (void *) this, reconnectCallback, receiveCallback, deliveryCallback); + // call reconnect to bootstrap + this->reconnect(); + } + + } +} + +void MQTTControllerService::initializeProperties() { + std::set<core::Property> supportedProperties; + supportedProperties.insert(BrokerURL); + supportedProperties.insert(ClientID); + supportedProperties.insert(UserName); + supportedProperties.insert(Password); + + supportedProperties.insert(KeepLiveInterval); + supportedProperties.insert(ConnectionTimeOut); + supportedProperties.insert(Topic); + supportedProperties.insert(QOS); + supportedProperties.insert(SecurityProtocol); + setSupportedProperties(supportedProperties); +} + +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/controllerservice/MQTTControllerService.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h new file mode 100644 index 0000000..a7bbebd --- /dev/null +++ b/extensions/mqtt/controllerservice/MQTTControllerService.h @@ -0,0 +1,342 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ + +#include <openssl/err.h> +#include <openssl/ssl.h> +#include <iostream> +#include <memory> +#include "core/Resource.h" +#include "utils/StringUtils.h" +#include "io/validation.h" +#include "core/controller/ControllerService.h" +#include "core/logging/LoggerConfiguration.h" +#include "controllers/SSLContextService.h" +#include "concurrentqueue.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +class Message { + public: + // empty constructor facilitates moves + Message() { + } + explicit Message(const std::string &topic, void *data, size_t dataLen) + : topic_(topic), + data_((uint8_t*) data, ((uint8_t*)data + dataLen)) { + } + explicit Message(const Message &&other) + : topic_(std::move(other.topic_)), + data_(std::move(other.data_)) { + } + ~Message() { + } + + Message &operator=(const Message &&other) { + topic_ = std::move(other.topic_); + data_ = std::move(other.data_); + return *this; + } + std::string topic_; + std::vector<uint8_t> data_; +}; + +/** + * MQTTContextService provides a controller service for MQTT connectivity. + * + */ +class MQTTControllerService : public core::controller::ControllerService { + public: + explicit MQTTControllerService(const std::string &name, const std::string &id) + : ControllerService(name, id), + initialized_(false), + client_(nullptr), + keepAliveInterval_(0), + connectionTimeOut_(0), + qos_(2), + ssl_context_service_(nullptr), + logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { + } + + explicit MQTTControllerService(const std::string &name, uuid_t uuid = 0) + : ControllerService(name, uuid), + initialized_(false), + client_(nullptr), + keepAliveInterval_(0), + connectionTimeOut_(0), + qos_(2), + ssl_context_service_(nullptr), + logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { + } + + explicit MQTTControllerService(const std::string &name, const std::shared_ptr<Configure> &configuration) + : ControllerService(name, nullptr), + initialized_(false), + client_(nullptr), + keepAliveInterval_(0), + connectionTimeOut_(0), + qos_(2), + ssl_context_service_(nullptr), + logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { + setConfiguration(configuration); + initialize(); + } + + static core::Property BrokerURL; + static core::Property ClientID; + static core::Property UserName; + static core::Property Password; + static core::Property CleanSession; + static core::Property KeepLiveInterval; + static core::Property ConnectionTimeOut; + static core::Property Topic; + static core::Property QOS; + static core::Property SecurityProtocol; + + virtual void initialize(); + + void yield() { + + } + + int send(const std::string &topic, const std::vector<uint8_t> &data) { + int token; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + const uint8_t *d = data.data(); + pubmsg.payload = const_cast<uint8_t*>(d); + pubmsg.payloadlen = data.size(); + pubmsg.qos = qos_; + pubmsg.retained = 0; + + auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token); + if (resp != MQTTCLIENT_SUCCESS) { + return -1; + } + if (qos_ == 0) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + delivered_[token] = true; + } + return token; + } + + int send(const std::string &topic, const uint8_t *data, size_t dataSize) { + int token; + + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = const_cast<uint8_t*>(data); + pubmsg.payloadlen = dataSize; + pubmsg.qos = qos_; + pubmsg.retained = 0; + + auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token); + if (resp != MQTTCLIENT_SUCCESS) { + return -1; + } + + if (qos_ == 0) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + delivered_[token] = true; + } + return token; + } + + bool isRunning() { + return getState() == core::controller::ControllerServiceState::ENABLED; + } + + bool isWorkAvailable() { + return false; + } + + virtual void onEnable(); + + void subscribeToTopic(const std::string newTopic) { + std::lock_guard<std::mutex> lock(initialization_mutex_); + if (topics_.find(newTopic) == topics_.end()) { + MQTTClient_subscribe(client_, newTopic.c_str(), qos_); + topics_[newTopic].size_approx(); + } + } + + bool waitForDelivery(const uint64_t millisToWait, int token) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return delivered_[token] == true;})) { + bool delivered = delivered_[token]; + delivered_.erase(token); + return delivered; + } else { + delivered_.erase(token); + return false; + } + } + + bool get(const uint64_t millisToWait, const std::string &topic, std::vector<uint8_t> &data) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) { + Message resp; + if (topics_[topic].try_dequeue(resp)) { + data = std::move(resp.data_); + return true; + } else { + return false; + } + } else { + return false; + } + } + + bool awaitResponse(const uint64_t millisToWait, int token, const std::string &topic, std::vector<uint8_t> &data) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] { + return + delivered_[token] == true; + })) { + bool delivered = delivered_[token]; + if (delivered) { + if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) { + Message resp; + if (topics_[topic].try_dequeue(resp)) { + data = std::move(resp.data_); + return true; + } else { + return false; + } + } else { + return false; + } + } + delivered_.erase(token); + return delivered; + } else { + delivered_.erase(token); + return false; + } + } + + protected: + + void acknowledgeDelivery(MQTTClient_deliveryToken token) { + std::lock_guard<std::mutex> lock(delivery_mutex_); + // locked the mutex + auto finder = delivered_.find(token); + // only acknowledge delivery if we expect the delivery to occur, otherwise + // we won't have any waiters. + if (finder != delivered_.end()) { + delivered_[token] = true; + } + } + + void enqueue(const std::string &topic, Message &&message) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + topics_[topic].enqueue(std::move(message)); + delivery_notification_.notify_one(); + } + + static void deliveryCallback(void *context, MQTTClient_deliveryToken dt) { + MQTTControllerService *service = (MQTTControllerService *) context; + service->acknowledgeDelivery(dt); + } + + static int receiveCallback(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + MQTTControllerService *service = (MQTTControllerService *) context; + std::string topic(topicName, topicLen == 0 ? strlen(topicName) : topicLen); + Message queueMessage(topic, message->payload, message->payloadlen); + service->enqueue(topic, std::move(queueMessage)); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; + } + static void reconnectCallback(void *context, char *cause) { + MQTTControllerService *service = (MQTTControllerService *) context; + service->reconnect(); + } + + bool reconnect() { + if (!client_) + return false; + if (MQTTClient_isConnected(client_)) + return true; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + conn_opts.keepAliveInterval = keepAliveInterval_; + conn_opts.cleansession = 1; + if (!userName_.empty()) { + conn_opts.username = userName_.c_str(); + conn_opts.password = passWord_.c_str(); + } + if (ssl_context_service_ != nullptr) + conn_opts.ssl = &sslopts_; + if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { + logger_->log_error("Failed to connect to MQTT broker %s", uri_); + return false; + } + + if (!topic_.empty()) { + std::unique_lock<std::mutex> lock(delivery_mutex_); + MQTTClient_subscribe(client_, topic_.c_str(), qos_); + } + return true; + } + + virtual void initializeProperties(); + + std::mutex initialization_mutex_; + std::atomic<bool> initialized_; + + MQTTClient client_; + std::string uri_; + std::string topic_; + int64_t keepAliveInterval_; + int64_t connectionTimeOut_; + int64_t qos_; + std::string clientID_; + std::string userName_; + std::string passWord_; + + private: + + std::map<int, bool> delivered_; + std::map<std::string, moodycamel::ConcurrentQueue<Message> > topics_; + + std::mutex delivery_mutex_; + std::condition_variable delivery_notification_; + + MQTTClient_SSLOptions sslopts_; + + std::shared_ptr<controllers::SSLContextService> ssl_context_service_; + + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namespace controllers */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/AbstractMQTTProcessor.cpp ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp new file mode 100644 index 0000000..345c6c5 --- /dev/null +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp @@ -0,0 +1,199 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include <stdio.h> +#include <memory> +#include <string> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); +core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); +core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); +core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); +core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); + MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer; + sslEnabled_ = false; +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { + uri_ = value; + logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { + clientID_ = value; + logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { + topic_ = value; + logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { + userName_ = value; + logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) { + passWord_ = value; + logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_); + } + value = ""; + if (context->getProperty(CleanSession.getName(), value) && !value.empty() && + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) { + logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_); + } + value = ""; + if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + keepAliveInterval_ = valInt/1000; + logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%ll]", keepAliveInterval_); + } + } + value = ""; + if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + connectionTimeOut_ = valInt/1000; + logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", connectionTimeOut_); + } + } + value = ""; + if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) && + core::Property::StringToInt(value, valInt)) { + qos_ = valInt; + logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); + } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { + if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; + logger_->log_debug("AbstractMQTTProcessor: ssl enable"); + value = ""; + if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value); + securityCA_ = value; + sslopts_.trustStore = securityCA_.c_str(); + } + value = ""; + if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value); + securityCert_ = value; + sslopts_.keyStore = securityCert_.c_str(); + } + value = ""; + if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value); + securityPrivateKey_ = value; + sslopts_.privateKey = securityPrivateKey_.c_str(); + } + value = ""; + if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { + logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", value); + securityPrivateKeyPassWord_ = value; + sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str(); + } + } + } + if (!client_) { + MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); + } + if (client_) { + MQTTClient_setCallbacks(client_, (void *) this, connectionLost, msgReceived, msgDelivered); + // call reconnect to bootstrap + this->reconnect(); + } +} + +bool AbstractMQTTProcessor::reconnect() { + if (!client_) + return false; + if (MQTTClient_isConnected(client_)) + return true; + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + conn_opts.keepAliveInterval = keepAliveInterval_; + conn_opts.cleansession = cleanSession_; + if (!userName_.empty()) { + conn_opts.username = userName_.c_str(); + conn_opts.password = passWord_.c_str(); + } + if (sslEnabled_) + conn_opts.ssl = &sslopts_; + if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { + logger_->log_error("Failed to connect to MQTT broker %s", uri_); + return false; + } + if (isSubscriber_) { + MQTTClient_subscribe(client_, topic_.c_str(), qos_); + } + return true; +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/8dd7e91f/extensions/mqtt/processors/AbstractMQTTProcessor.h ---------------------------------------------------------------------- diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h new file mode 100644 index 0000000..aab0ef5 --- /dev/null +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h @@ -0,0 +1,168 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext" +#define MQTT_SECURITY_PROTOCOL_SSL "ssl" + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<AbstractMQTTProcessor>::getLogger()) { + client_ = nullptr; + cleanSession_ = false; + keepAliveInterval_ = 60; + connectionTimeOut_ = 30; + qos_ = 0; + isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { + if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); + } + if (client_ && MQTTClient_isConnected(client_)) { + MQTTClient_disconnect(client_, connectionTimeOut_); + } + if (client_) + MQTTClient_destroy(&client_); + } + // Processor Name + static constexpr char const* ProcessorName = "AbstractMQTTProcessor"; + // Supported Properties + static core::Property BrokerURL; + static core::Property ClientID; + static core::Property UserName; + static core::Property PassWord; + static core::Property CleanSession; + static core::Property KeepLiveInterval; + static core::Property ConnectionTimeOut; + static core::Property Topic; + static core::Property QOS; + static core::Property SecurityProtocol; + static core::Property SecurityCA; + static core::Property SecurityCert; + static core::Property SecurityPrivateKey; + static core::Property SecurityPrivateKeyPassWord; + + // Supported Relationships + static core::Relationship Failure; + static core::Relationship Success; + + public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi AbstractMQTTProcessor + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + } + // OnTrigger method, implemented by NiFi AbstractMQTTProcessor + virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + } + // Initialize, over write by NiFi AbstractMQTTProcessor + virtual void initialize(void); + // MQTT async callbacks + static void msgDelivered(void *context, MQTTClient_deliveryToken dt) { + AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; + processor->delivered_token_ = dt; + } + static int msgReceived(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; + if (processor->isSubscriber_) { + if (!processor->enqueueReceiveMQTTMsg(message)) + MQTTClient_freeMessage(&message); + } else { + MQTTClient_freeMessage(&message); + } + MQTTClient_free(topicName); + return 1; + } + static void connectionLost(void *context, char *cause) { + AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; + processor->reconnect(); + } + bool reconnect(); + // enqueue receive MQTT message + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message) { + return false; + } + + protected: + MQTTClient client_; + MQTTClient_deliveryToken delivered_token_; + std::string uri_; + std::string topic_; + int64_t keepAliveInterval_; + int64_t connectionTimeOut_; + int64_t qos_; + bool cleanSession_; + std::string clientID_; + std::string userName_; + std::string passWord_; + bool isSubscriber_; + + private: + std::shared_ptr<logging::Logger> logger_; + MQTTClient_SSLOptions sslopts_; + bool sslEnabled_; + std::string securityCA_; + std::string securityCert_; + std::string securityPrivateKey_; + std::string securityPrivateKeyPassWord_; +}; + +REGISTER_RESOURCE(AbstractMQTTProcessor); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif
